diff --git a/SpaceShuttle/PPO.py b/SpaceShuttle/PPO.py new file mode 100644 index 0000000..e96cb03 --- /dev/null +++ b/SpaceShuttle/PPO.py @@ -0,0 +1,356 @@ +""" +PPO implementation from +https://github.com/Lizhi-sjtu/DRL-code-pytorch/tree/main/5.PPO-continuous + +Added the POLICEd layers from +https://github.com/RandallBalestriero/POLICE +Modified the POLICE algorithm: + +in function 'enforce_constraint_forward' +sign = agreement[:, invalid_ones].float().sum(0).sub_(C/2 + 1e-3).sign_() +replaces +sign = agreement[:, invalid_ones].half().sum(0).sub_(C/2 + 1e-3).sign_() +as the half() was reducing the precision and the sign could end up being 0, +which renders this function ineffective. + +To help train the POLICEd policy, it starts with a buffer of size 0, i.e., no +affine region. Once the reward is high enough the affine region is slowly enlarged until +reaching the desired size, see Enlarging_POLICEd_Gaussian_Actor +""" + + +import torch +import torch.nn.functional as F +from torch.utils.data.sampler import BatchSampler, SubsetRandomSampler +import torch.nn as nn +from torch.distributions import Normal + + + +def enforce_constraint_forward(x, W, b, C): + """Perform a forward pass on the given `x` argument which contains both the `C` vertices + describing the region `R` onto which the DNN is constrained to stay affine, and the mini-batch""" + h = x @ W.T + b + V = h[-C:] + with torch.no_grad(): + agreement = V > 0 + invalid_ones = agreement.all(0).logical_not_().logical_and_(agreement.any(0)) + sign = agreement[:, invalid_ones].float().sum(0).sub_(C/2 + 1e-3).sign_() + + extra_bias = (V[:, invalid_ones] * sign - 1e-4).amin(0).clamp(max=0) * sign + h[:, invalid_ones] -= extra_bias + return h + + +class ConstrainedLayer(nn.Linear): + def forward(self, x, C): + return enforce_constraint_forward(x, self.weight, self.bias, C) + + + +### With buffer starting from 0 and progressively enlarging +class Enlarging_POLICEd_Gaussian_Actor(nn.Module): + def __init__(self, args): + super(Enlarging_POLICEd_Gaussian_Actor, self).__init__() + self.nb_layers = 3 + self.width = args.hidden_width + self.layer_0 = ConstrainedLayer(args.state_dim, self.width) + self.layer_1 = ConstrainedLayer(self.width, self.width) + self.layer_2 = ConstrainedLayer(self.width, args.action_dim) + + self.log_std = nn.Parameter(torch.zeros(1, args.action_dim)) # We use 'nn.Parameter' to train log_std automatically + self.buffer_vertices = torch.tensor(args.buffer_vertices, dtype=torch.float32) + self.num_vertices = self.buffer_vertices.shape[0] + self.use_state_norm = args.use_state_norm + + # Orthogonal layer initialization + orthogonal_init(self.layer_0) + orthogonal_init(self.layer_1) + orthogonal_init(self.layer_2, gain=0.01) + + ### Affine buffer + self.buffer_center = torch.ones((self.num_vertices, 1)) @ self.buffer_vertices.mean(axis=0, keepdims=True) + ### Initial buffer vertices of volume 0 + self.vertices = self.buffer_center.clone() + + ### Counting iterations to slowly enlarge the buffer once allowed + self.iter = 0 + self.max_iter = args.max_iter_enlargment + ### Initially not allowed to enlarge the buffer + self.enlarge_buffer = False + + def enlarging_buffer(self): + """Iteratively enlarges the affine buffer until reaching the desired volume.""" + self.iter += 1 + if self.iter > self.max_iter: + self.enlarge_buffer = False + print("Buffer reached full size") + + if self.iter % 10 == 0: + a = self.iter/self.max_iter + self.vertices = a*self.buffer_vertices + (1-a)*self.buffer_center + + + def forward(self, s, state_norm=None, update=True): + if self.enlarge_buffer: + self.enlarging_buffer() + + if self.use_state_norm: + s = state_norm(s, update=update) + v = state_norm(self.vertices, update=False) + with torch.no_grad(): + s = torch.cat( (s, v), dim=0) + else: + with torch.no_grad(): + s = torch.cat( (s, self.vertices), dim=0) + + s = torch.relu(self.layer_0(s, self.num_vertices)) + s = torch.relu(self.layer_1(s, self.num_vertices)) + s = self.layer_2(s, self.num_vertices) + mean = s[:-self.num_vertices] + return mean + + def get_dist(self, s, state_norm): + mean = self.forward(s, state_norm) + log_std = self.log_std.expand_as(mean) # To make 'log_std' have the same dimension as 'mean' + std = torch.exp(log_std) # The reason we train the 'log_std' is to ensure std=exp(log_std)>0 + dist = Normal(mean, std) # Get the Gaussian distribution + return dist + + +### Original without buffer enlargment +class POLICEd_Gaussian_Actor(nn.Module): + def __init__(self, args): + super(POLICEd_Gaussian_Actor, self).__init__() + self.nb_layers = 3 + self.width = args.hidden_width + self.layer_0 = ConstrainedLayer(args.state_dim, self.width) + self.layer_1 = ConstrainedLayer(self.width, self.width) + self.layer_2 = ConstrainedLayer(self.width, args.action_dim) + + self.log_std = nn.Parameter(torch.zeros(1, args.action_dim)) # We use 'nn.Parameter' to train log_std automatically + self.buffer_vertices = torch.tensor(args.buffer_vertices, dtype=torch.float32) + self.num_vertices = self.buffer_vertices.shape[0] + self.use_state_norm = args.use_state_norm + + # Orthogonal layer initialization + orthogonal_init(self.layer_0) + orthogonal_init(self.layer_1) + orthogonal_init(self.layer_2, gain=0.01) + + def forward(self, s, state_norm=None, update=True): + if self.use_state_norm: + s = state_norm(s, update=update) + v = state_norm(self.buffer_vertices, update=False) + with torch.no_grad(): + s = torch.cat( (s, v), dim=0) + else: + with torch.no_grad(): + s = torch.cat( (s, self.buffer_vertices), dim=0) + + s = torch.relu(self.layer_0(s, self.num_vertices)) + s = torch.relu(self.layer_1(s, self.num_vertices)) + s = self.layer_2(s, self.num_vertices) + mean = s[:-self.num_vertices] + return mean + + def get_dist(self, s, state_norm): + mean = self.forward(s, state_norm) + log_std = self.log_std.expand_as(mean) # To make 'log_std' have the same dimension as 'mean' + std = torch.exp(log_std) # The reason we train the 'log_std' is to ensure std=exp(log_std)>0 + dist = Normal(mean, std) # Get the Gaussian distribution + return dist + + + +# Orthogonal initialization of NN layers +def orthogonal_init(layer, gain=1.0): + nn.init.orthogonal_(layer.weight, gain=gain) + nn.init.constant_(layer.bias, 0) + + +# unPOLICEd baseline Gaussian actor +class Gaussian_Actor(nn.Module): + def __init__(self, args): + super(Gaussian_Actor, self).__init__() + self.nb_layers = 3 + self.width = args.hidden_width + self.layer_0 = nn.Linear(args.state_dim, self.width) + self.layer_1 = nn.Linear(self.width, self.width) + self.layer_2 = nn.Linear(self.width, args.action_dim) + self.log_std = nn.Parameter(torch.zeros(1, args.action_dim)) # We use 'nn.Parameter' to train log_std automatically + self.use_state_norm = args.use_state_norm + + # Orthogonal initialization + orthogonal_init(self.layer_0) + orthogonal_init(self.layer_1) + orthogonal_init(self.layer_2, gain=0.01) + + def forward(self, s, state_norm=None): + if self.use_state_norm: + s = state_norm(s) + s = torch.relu(self.layer_0(s)) + s = torch.relu(self.layer_1(s)) + mean = self.layer_2(s) + return mean + + def get_dist(self, s, state_norm): + mean = self.forward(s, state_norm) + log_std = self.log_std.expand_as(mean) # To make 'log_std' have the same dimension as 'mean' + std = torch.exp(log_std) # The reason we train the 'log_std' is to ensure std=exp(log_std)>0 + dist = Normal(mean, std) # Get the Gaussian distribution + return dist + + +class Critic(nn.Module): + def __init__(self, args): + super(Critic, self).__init__() + self.fc1 = nn.Linear(args.state_dim, args.hidden_width) + self.fc2 = nn.Linear(args.hidden_width, args.hidden_width) + self.fc3 = nn.Linear(args.hidden_width, 1) + + # Orthogonal initialization + orthogonal_init(self.fc1) + orthogonal_init(self.fc2) + orthogonal_init(self.fc3) + + def forward(self, s): + s = torch.relu(self.fc1(s)) + s = torch.relu(self.fc2(s)) + v_s = self.fc3(s) + return v_s + + + +class PPO(): + def __init__(self, args): + self.max_action = args.max_action + self.batch_size = args.batch_size + self.mini_batch_size = args.mini_batch_size + self.max_train_steps = args.max_train_steps + self.lr_a = args.lr_a # Learning rate of actor + self.lr_c = args.lr_c # Learning rate of critic + self.gamma = args.gamma # Discount factor + self.lamda = args.lamda # GAE parameter + self.epsilon = args.epsilon # PPO clip parameter + self.K_epochs = args.K_epochs # PPO parameter + self.entropy_coef = args.entropy_coef # Entropy coefficient + self.set_adam_eps = args.set_adam_eps + self.use_grad_clip = args.use_grad_clip + self.use_lr_decay = args.use_lr_decay + self.use_adv_norm = args.use_adv_norm + self.use_state_norm = args.use_state_norm + + if args.POLICEd: + if args.enlarging_buffer: + self.actor = Enlarging_POLICEd_Gaussian_Actor(args) + else: + self.actor = POLICEd_Gaussian_Actor(args) + else: + self.actor = Gaussian_Actor(args) + self.critic = Critic(args) + + if self.set_adam_eps: # Trick 9: set Adam epsilon=1e-5 + self.optimizer_actor = torch.optim.Adam(self.actor.parameters(), lr=self.lr_a, eps=1e-5) + self.optimizer_critic = torch.optim.Adam(self.critic.parameters(), lr=self.lr_c, eps=1e-5) + else: + self.optimizer_actor = torch.optim.Adam(self.actor.parameters(), lr=self.lr_a) + self.optimizer_critic = torch.optim.Adam(self.critic.parameters(), lr=self.lr_c) + + def evaluate(self, s, state_norm=None): # When evaluating the policy, we only use the mean + if self.use_state_norm: + assert state_norm is not None, "State normalization is required" + s = torch.unsqueeze(torch.tensor(s, dtype=torch.float), 0) + a = self.actor(s, state_norm).detach().numpy().flatten() + return a + + def choose_action(self, s, state_norm=None): + if self.use_state_norm: + assert state_norm is not None, "State normalization is required" + s = torch.unsqueeze(torch.tensor(s, dtype=torch.float), 0) + + with torch.no_grad(): + dist = self.actor.get_dist(s, state_norm) + a = dist.sample() # Sample the action according to the probability distribution + a_logprob = dist.log_prob(a) # The log probability density of the action + return a.numpy().flatten(), a_logprob.numpy().flatten() + + def update(self, replay_buffer, total_steps, state_norm): + s, a, a_logprob, r, s_, dw, done = replay_buffer.numpy_to_tensor() # Get training data + batch_size = a.shape[0] + """ + Calculate the advantage using GAE + 'dw=True' means dead or win, there is no next state s' + 'done=True' represents the terminal of an episode(dead or win or reaching the max_episode_steps). When calculating the adv, if done=True, gae=0 + """ + adv = [] + gae = 0 + with torch.no_grad(): # adv and v_target have no gradient + vs = self.critic(s) + vs_ = self.critic(s_) + deltas = r + self.gamma * (1.0 - dw) * vs_ - vs + for delta, d in zip(reversed(deltas.flatten().numpy()), reversed(done.flatten().numpy())): + gae = delta + self.gamma * self.lamda * gae * (1.0 - d) + adv.insert(0, gae) + adv = torch.tensor(adv, dtype=torch.float).view(-1, 1) + v_target = adv + vs + if self.use_adv_norm: # Trick 1:advantage normalization + adv = ((adv - adv.mean()) / (adv.std() + 1e-5)) + + # Optimize policy for K epochs: + for _ in range(self.K_epochs): + # Random sampling and no repetition. 'False' indicates that training will continue even if the number of samples in the last time is less than mini_batch_size + # for index in BatchSampler(SubsetRandomSampler(range(self.batch_size)), self.mini_batch_size, False): + for index in BatchSampler(SubsetRandomSampler(range(batch_size)), self.mini_batch_size, False): + dist_now = self.actor.get_dist(s[index], state_norm) + dist_entropy = dist_now.entropy().sum(1, keepdim=True) # shape(mini_batch_size X 1) + a_logprob_now = dist_now.log_prob(a[index]) + # a/b=exp(log(a)-log(b)) In multi-dimensional continuous action space,we need to sum up the log_prob + ratios = torch.exp(a_logprob_now.sum(1, keepdim=True) - a_logprob[index].sum(1, keepdim=True)) # shape(mini_batch_size X 1) + + surr1 = ratios * adv[index] # Only calculate the gradient of 'a_logprob_now' in ratios + surr2 = torch.clamp(ratios, 1 - self.epsilon, 1 + self.epsilon) * adv[index] + actor_loss = -torch.min(surr1, surr2) - self.entropy_coef * dist_entropy # Trick 5: policy entropy + # Update actor + self.optimizer_actor.zero_grad() + actor_loss.mean().backward() + if self.use_grad_clip: # Trick 7: Gradient clip + torch.nn.utils.clip_grad_norm_(self.actor.parameters(), 0.5) + self.optimizer_actor.step() + + v_s = self.critic(s[index]) + critic_loss = F.mse_loss(v_target[index], v_s) + # Update critic + self.optimizer_critic.zero_grad() + critic_loss.backward() + if self.use_grad_clip: # Trick 7: Gradient clip + torch.nn.utils.clip_grad_norm_(self.critic.parameters(), 0.5) + self.optimizer_critic.step() + + if self.use_lr_decay: # Trick 6:learning rate Decay + self.lr_decay(total_steps) + + def lr_decay(self, total_steps): + lr_a_now = self.lr_a * (1 - total_steps / self.max_train_steps) + lr_c_now = self.lr_c * (1 - total_steps / self.max_train_steps) + for p in self.optimizer_actor.param_groups: + p['lr'] = lr_a_now + for p in self.optimizer_critic.param_groups: + p['lr'] = lr_c_now + + + def save(self, filename): + torch.save(self.critic.state_dict(), filename + "_critic") + torch.save(self.optimizer_critic.state_dict(), filename + "_critic_optimizer") + + torch.save(self.actor.state_dict(), filename + "_actor") + torch.save(self.optimizer_actor.state_dict(), filename + "_actor_optimizer") + + + def load(self, filename): + self.critic.load_state_dict(torch.load(filename + "_critic")) + self.optimizer_critic.load_state_dict(torch.load(filename + "_critic_optimizer")) + + self.actor.load_state_dict(torch.load(filename + "_actor")) + self.optimizer_actor.load_state_dict(torch.load(filename + "_actor_optimizer")) + diff --git a/SpaceShuttle/SpaceShuttle.py b/SpaceShuttle/SpaceShuttle.py new file mode 100644 index 0000000..89abfe3 --- /dev/null +++ b/SpaceShuttle/SpaceShuttle.py @@ -0,0 +1,440 @@ +# -*- coding: utf-8 -*- +""" +Created on Fri Mar 1 10:25:36 2024 + +@author: Jean-Baptiste Bouvier + +Space Shuttle system taken from +"Optimal Online Path Planning for Approach and Landing Guidance" +by Ali Heydari and S. N. Balakrishnan +""" + +import copy +import torch +import numpy as np + + +class ShuttleEnv(): + """ + Space Shuttle environment + 1 action 3 states available in 2 state representations (s_ and x_) + Relative degree 2 + + | Num | Name | Action | Min | Max | Unit| + |-----|-------|-----------------|-------|------|-----| + | 0 | u | control | -1 | 1 | | + + alpha = 30 * ( u + 0.8 ) + | 0 | alpha | angle of attack | 0 | 50 | deg | + + x_state (used for dynamics is an internal state) + | Num | Name | Observation | Min | Max | Unit | + | --- | ----- | -------------------| ----- | ---- | -----| + | 0 | v | velocity | 0 | Inf | m/s | + | 1 | gamma | flight path angle | -pi/2 | pi/2 | rad | + | 2 | h | altitude | 0 | Inf | m/s | + + s_state (the external state used for the buffer composed of the output iterated derivatives) + | Num | Name | Observation | Min | Max | Unit| + | --- | ----- | ----------------------------------| ----- | ---- |-----| + | 0 | y | output = -h | -Inf | Inf | m | + | 1 | y' | output derivative = -v sin(gamma) | -Inf | Inf | m/s | + | 2 | gamma | flight path angle | -pi/2 | pi/2 | rad | + + """ + + def __init__(self, args): + + self.max_episode_steps = 500 + self.reward_threshold = -args.s2_min # [ft/s] for vertical velocity at landing + self.relative_degree = 2 + self.state_size = 3 + self.action_size = 1 + + self.dt = 0.1 # [s] timestep + self.action_max = 1 # [] (control input) + + self.CD0 = 0.0975 # [] zero-lift drag coefficient + self.CL0 = 2.3 # [] zero-angle-of-attack lift coefficient + self.Ki = 0.1819 # [] lift-induced drag coefficient parameter + + ###### Imperial system + self.Sm = 0.912 # [ft^2/slug] shuttle aero area / mass + self.H = 8.5e3*3.28 # [ft] altitude scale for the atmospheric pressure + self.g = 32.174 # [ft/s^2] Earth’s gravitational acceleration + self.rho_0 = 0.0027 # [slugs/ft^3] sea-level air density + + # Transformation between input u in [-1, 1] and angle of attack alpha + self.u_to_alpha_slope = 30. + self.u_to_alpha_offset = 0.8 + + self.initial_range = False # True + + if self.initial_range: # Initial state belongs to a range + self.v_0_min = 370 # [ft/s] + self.v_0_max = 400 # [ft/s] + self.gamma_0_min = -10*np.pi/180 # [rad] + self.gamma_0_max = -30*np.pi/180 # [rad] + self.h_0_min = 500 # [ft] + self.h_0_max = 500 # [ft] + self.x_0_min = np.array([self.v_0_min, self.gamma_0_min, self.h_0_min]) + self.x_0_max = np.array([self.v_0_max, self.gamma_0_max, self.h_0_max]) + + else: # Initial state around a reference state + self.v_0_min = 300 # [ft/s] initial velocity + self.gamma_0 = -30*np.pi/180 # [rad] initial flight path angle + self.h_0 = 10000 # [ft] initial altitude + self.x_0_min = np.array([self.v_0_min, self.gamma_0, self.h_0]) + self.x_reset_bound = 0.001 # uniform noise on the reset + + + ### Buffer parameters + self.y_min = args.y_min + self.y_max = args.y_max + self.y_dot_max = args.y_dot_max + self.s2_min = args.s2_min + # self.s3_min = args.s3_min # gamma [rad] + # self.s3_max = args.s3_max # gamma [rad] + self.beta = (self.y_dot_max - self.s2_min)/(self.y_max - self.y_min) + self.beta_offset = self.y_dot_max + self.beta * self.y_min + + ### Sanity checks + assert abs(self.y_dot_max - (self.beta_offset - self.beta * self.y_min)) < 1e-8 + assert abs(self.s2_min - (self.beta_offset - self.beta * self.y_max)) < 1e-8 + + self.nb_buffer_partitions = args.nb_buffer_partitions + self._buffer_vertices() + # For handmade buffer this does not work + self.s3_min = self.buffer_vertices[:, 2].min() # gamma [rad] + self.s3_max = self.buffer_vertices[:, 2].max() # gamma [rad] + self.buffer_min = np.array([self.y_min, self.s2_min, self.s3_min]) + + # self._epsilon() # [] affine approximation linear + self._epsilon_partition() + if self.nb_buffer_partitions == 1: + self.all_buffer_vertices = self.buffer_vertices + else: + self.all_buffer_vertices = np.concatenate((self.buffer_vertices, self.extra_vertices), axis=0) + + def action_to_alpha(self, action): + """Calculates angle of attack alpha from action u in [-1, 1]""" + u = action.clip(-self.action_max, self.action_max) + alpha = self.u_to_alpha_slope*(u + self.u_to_alpha_offset) + return alpha + + + def alpha_to_action(self, alpha): + """Calculates action u in [-1, 1] from angle of attack alpha""" + return alpha/self.u_to_alpha_slope - self.u_to_alpha_offset + + + def s(self, x_state): + """Transforms the x_state into the s_state""" + v, gamma, h = x_state + s_state = np.array([-h, -v*np.sin(gamma), gamma]) + return s_state + + + def x(self, s_state): + """Transforms the s_state into the x_state""" + y, y_dot, gamma = s_state + x_state = np.array([-y_dot/np.sin(gamma), gamma, -y]) + return x_state + + + def out_of_bounds(self): + """Verifies whether the x_state is out of bounds""" + v, gamma, h = self.x_state + return abs(gamma) > np.pi/2 or abs(v) > 1000 + + + + def step(self, action): + """step for given action. + Extra input: if alpha_in = True, then action = alpha [deg]""" + self.episode_step += 1 + previous_s_state = self.s_state + alpha_deg = self.action_to_alpha(action)[0] + + v, gamma, h = self.x_state + + rho = self.rho_0 * np.exp(-h/self.H) # air density + alpha_rad = alpha_deg * np.pi/180 + CL = self.CL0 * np.sin(alpha_rad)**2 * np.cos(alpha_rad) # lift coefficient + CD = self.CD0 + self.Ki * CL**2 # drag coefficient + + dv = -0.5 * rho * v**2 * self.Sm * CD - self.g * np.sin(gamma) + dgamma = 0.5 * rho * v * self.Sm * CL - self.g * np.cos(gamma)/v + dh = v * np.sin(gamma) + self.x_state += self.dt * np.array([dv, dgamma, dh]) + self.s_state = self.s(self.x_state) + + v, gamma, h = self.x_state # updated state + reward = -10*np.maximum(abs(action) - self.action_max, 0.)[0] # keep action in admissible range + out = self.out_of_bounds() + reward -= out*50000 # penalize if state grows out of bounds + reward -= 0.2*abs(alpha_deg - self.previous_alpha_deg) # penalize chattering + + if self.in_buffer(s_state = previous_s_state): + repulsion_respect = self.repulsion_respect(previous_s_state) + else: + repulsion_respect = False # True + + done = self.s_state[0] > self.y_max or out or self.in_landing_box() + if self.episode_step >= self.max_episode_steps: + done = True + + self.previous_alpha_deg = alpha_deg + return copy.deepcopy(self.s_state), reward, done, repulsion_respect + + + # if small altitude and small vertical velocity, we are good + def in_landing_box(self): + """Verifies whether the state is in the landing box, i.e., below the buffer""" + return self.s_state[0] > self.y_min and self.s_state[1] < self.s2_min + + def final_reward(self): + """Calculates the reward at the last time step. + Penalize altitude h = s[0] and penalize final vertical velocity h_dot = s[1] + """ + return -abs(self.s_state[0]) - abs(self.s_state[1]) + + + def reset(self): + """Resets the state of the environment around a base x_state""" + if self.initial_range: + self.x_state = self.x_0_min + (self.x_0_max - self.x_0_min) * np.random.rand(self.state_size) + else: + self.x_state = self.x_0_min*(1 + self.x_reset_bound*np.random.rand(self.state_size)) + + self.previous_alpha_deg = 0 # [deg] value of previous alpha to discourage chattering + self.episode_step = 0 + self.s_state = self.s(self.x_state) + return copy.deepcopy(self.s_state) + + + def x_reset_to(self, x_state): + """Resets the state of the environment to a specified x_state""" + self.x_state = copy.deepcopy(x_state) + self.s_state = self.s(self.x_state) + self.previous_alpha_deg = 0 # [deg] value of previous alpha to discourage chattering + self.episode_step = 0 + return copy.deepcopy(self.s_state) + + + def s_reset_to(self, s_state): + """Resets the state of the environment to a specified s_state""" + self.s_state = copy.deepcopy(s_state) + self.x_state = self.x(s_state) + self.previous_alpha_deg = 0 # [deg] value of previous alpha to discourage chattering + self.episode_step = 0 + return s_state + + + def repulsion(self, previous_s_state): + """Calculates the actuated state and its desired maximal value for repulsion""" + s_r = self.s_state[self.relative_degree-1] + y_r = (s_r - previous_s_state[self.relative_degree-1])/self.dt + tol = -2*self.eps - s_r * self.beta + return y_r, tol + + def repulsion_respect(self, previous_s_state): + """Verifies whether the repulsion condition holds.""" + y_r, tol = self.repulsion(previous_s_state) + return y_r <= tol + + + def compute_buffer_max(self, s_state): + """Calculates the vector of maximal values of the buffer at a given s_state""" + self.buffer_max = np.array([self.y_max, + self.beta_offset - self.beta * s_state[0], + self.s3_max]) + return self.buffer_max + + + def in_buffer(self, x_state=None, s_state=None): + """Verifies whether the x_state is in the buffer.""" + if x_state is not None: + s_state = self.s(x_state) + + ### Doesn't work with the handmade buffer, it makes gamma interval + ### larger than it actually is, so extra states will be declared in-buffer + ### even if they are not in the handmade buffer. + if (s_state >= self.buffer_min).all(): + b_max = self.compute_buffer_max(s_state) + return (s_state <= b_max).all() + + return False + + + def update_epsilon(self, agent, state_norm): + """Updates epsilon using the policy 'agent' instead of using any admissible control input. + Needs state_norm to call the agent policy""" + self._epsilon_partition(agent, state_norm) + return self.eps + + + def _buffer_vertices(self): + """Calculates the buffer vertices to make them feasible flight condidtions. + The buffer cannot encompass the whole range of velocities since low speeds make the shuttle + crash no matter what. The shuttle has to be physically capable of landing + from all the vertices of the buffer, which requires some fine-tuning vertices.""" + + self.buffer_vertices = np.zeros((6, 3)) + + ###################### Vertex 1 ################################# + h = -self.y_max # 0ft/s + h_dot = -self.s2_min # -6ft/s + v = 250. # ft/s + gamma = np.arcsin(h_dot/v) + self.buffer_vertices[0] = self.s(np.array([v, gamma, h])) + + ###################### Vertex 2 ################################# + h = -self.y_max # 0ft/s + h_dot = -self.s2_min # -6ft/s + v = 300. # ft/s + gamma = np.arcsin(h_dot/v) + self.buffer_vertices[1] = self.s(np.array([v, gamma, h])) + + ###################### Vertex 3 ################################# + h = -self.y_min # 50ft/s + h_dot = -self.s2_min # -6ft/s + v = 250. # ft/s + gamma = np.arcsin(h_dot/v) + self.buffer_vertices[2] = self.s(np.array([v, gamma, h])) + + ###################### Vertex 4 ################################# + h = -self.y_min # 50ft/s + h_dot = -self.s2_min # -6ft/s + v = 300. # ft/s + gamma = np.arcsin(h_dot/v) + self.buffer_vertices[3] = self.s(np.array([v, gamma, h])) + + ###################### Vertex 5 ################################# + h = -self.y_min # 50ft/s + h_dot = -self.y_dot_max # -100ft/s + v = 450. # ft/s + gamma = np.arcsin(h_dot/v) + self.buffer_vertices[4] = self.s(np.array([v, gamma, h])) + + ###################### Vertex 6 ################################# + h = -self.y_min # 50ft/s + h_dot = -self.y_dot_max # -100ft/s + v = 500. # ft/s + gamma = np.arcsin(h_dot/v) + self.buffer_vertices[5] = self.s(np.array([v, gamma, h])) + + + + def _epsilon_partition(self, agent=None, state_norm=None): + """Since the Shuttle dynamics are highly nonlinear, the approximation measure + epsilon takes a value too large for the POLICEd condition. + Here we partition the buffer into smaller polytopic regions with each their own + approximation measure epsilon. + If agent is provided, instead of using any admissible control inputs, we only use + the provided policy to calculate epsilon. It should reduce epsilon. + We need state_norm to call the agent policy. + Buffer is split along s_2. + Returns the maximal epsilon and all extra epsilons""" + + ### epsilon increases faster at high y_dot, so make partitions smaller + s2_pts = [self.s2_min, self.y_dot_max] + ratio = 0.7 + for _ in range(self.nb_buffer_partitions-1): + s2_pts.insert(-1, s2_pts[-1]*ratio + (1-ratio)*s2_pts[-2]) + s2_pts = np.array(s2_pts) + + self.eps = 0 + for i in range(self.nb_buffer_partitions): # iterates over the sub-buffers + s2_min = s2_pts[i] + s2_max = s2_pts[i+1] + s1_max = (s2_min - self.beta_offset)/(-self.beta) + eps = self._multi_epsilon(s1_max, s2_min, s2_max, agent, state_norm) + if eps > self.eps: + self.eps = eps + + extra_V = [] + for i in range(1, self.nb_buffer_partitions): + extra_V.append(np.array([self.y_min, s2_pts[i], self.s3_min])) + extra_V.append(np.array([self.y_min, s2_pts[i], self.s3_max])) + s1 = (s2_pts[i] - self.beta_offset)/(-self.beta) + extra_V.append(np.array([s1, s2_pts[i], self.s3_min])) + extra_V.append(np.array([s1, s2_pts[i], self.s3_max])) + + self.extra_vertices = np.array(extra_V) + + + def _multi_epsilon(self, s1_max, s2_min, s2_max, agent=None, state_norm=None): + """Calculates the approximation measure epsilon for a given polytope. + If agent is provided, instead of using any admissible control inputs, we only use + the provided policy to calculate epsilon. It should reduce epsilon. + We need state_norm to call the agent policy.""" + + nb_steps = 10 + + ### Range of states s1 + s1_min = self.y_min + s1_step = (s1_max - s1_min)/nb_steps + s1_range = np.arange(start = s1_min, + stop = s1_max + s1_step/2, + step = s1_step) + ### Range of states s2 + s2_step = (s2_max - s2_min)/nb_steps + s2_range = np.arange(start = s2_min, + stop = s2_max + s2_step/2, + step = s2_step) + ### Range of states s3 + s3_step = (self.s3_max - self.s3_min)/nb_steps + s3_range = np.arange(start = self.s3_min, + stop = self.s3_max + s3_step/2, + step = s3_step) + + action_step = 2*self.action_max/nb_steps + action_range = np.arange(start = -self.action_max, + stop = self.action_max + action_step/2, + step = action_step) + + ### Storing datapoints to perform linear regression on + N = (nb_steps+1)**(self.state_size + self.action_size) + States = np.zeros((N, self.state_size)) + Next_States = np.zeros((N, self.state_size)) + Actions = np.zeros((N, self.action_size)) + + i = 0 + for s1 in s1_range: + for s2 in s2_range: + if s2 > self.beta_offset - self.beta * s1: break # go to next s1 + + for s3 in s3_range: + s = np.array([s1, s2, s3]) + if agent is not None: + with torch.no_grad(): + action_range = agent.evaluate(s, state_norm) + + for action in action_range: + self.s_reset_to(s) + next_s, _, done, _ = self.step(np.array([action])) + if not done: # ignore crash states + States[i] = s + Actions[i] = action + Next_States[i] = next_s + i += 1 + + #### Least square fit + Ones = np.ones((i, 1)) + A = np.concatenate((States[:i], Actions[:i], Ones), axis=1) + y_r = (Next_States[:i, self.relative_degree-1] - States[:i, self.relative_degree-1])/self.dt + x, _, _, _ = np.linalg.lstsq(A, y_r, rcond=None) + + ### Calculation of epsilon + CA = np.array([[x[0], x[1], x[2]]]) + CB = x[3] + Cc = x[4] + affine_pred = States[:i] @ CA.T + Actions[:i] * CB.T + Cc + eps = abs(y_r - affine_pred.squeeze()).max() + return eps + + + + + \ No newline at end of file diff --git a/SpaceShuttle/comparison.py b/SpaceShuttle/comparison.py new file mode 100644 index 0000000..84be820 --- /dev/null +++ b/SpaceShuttle/comparison.py @@ -0,0 +1,368 @@ +# -*- coding: utf-8 -*- +""" +Created on Tue Mar 19 13:24:01 2024 + +@author: Jean-Bapsite Bouvier + +Comparison of the trained POLICEd and baseline models on the +high relative degree Shuttle Landing. +Creates the phase portrait of the shuttle landing for the baseline and the POLICEd policies. + + x_state (used for dynamics is an internal state) + | Num | Name | Observation | Min | Max | Unit | + | --- | ----- | -------------------| ----- | ---- | -----| + | 0 | v | velocity | 0 | Inf | m/s | + | 1 | gamma | flight path angle | -pi/2 | pi/2 | rad | + | 2 | h | altitude | 0 | Inf | m/s | + + s_state (the external state used for the buffer composed of the output iterated derivatives) + | Num | Name | Observation | Min | Max | Unit| + | --- | ----- | ----------------------------------| ----- | ---- |-----| + | 0 | y | output = -h | -Inf | Inf | m | + | 1 | y' | output derivative = -v sin(gamma) | -Inf | Inf | m/s | + | 2 | gamma | flight path angle | -pi/2 | pi/2 | rad | +""" + +import torch +import numpy as np +from utils import load, nice_plot +import matplotlib.pyplot as plt +import matplotlib.lines as mlines + + +# wide = True # plot the whole trajectories +wide = False # plot zoomed in trajectories showing the buffer and baseline diverging + +#%% Extra function + +def add_arrow(line, position=None, direction='right', size=15, arrow_len = 7): + """ + add an arrow to a line. + + line: Line2D object + position: x-position of the arrow. If None, mean of xdata is taken + direction: 'left' or 'right' + size: size of the arrow in fontsize points + + Example use: + t = np.linspace(-2, 2, 100) + y = np.sin(t) + line = plt.plot(t, y)[0] + add_arrow(line) + """ + + color = line.get_color() + + xdata = line.get_xdata() + ydata = line.get_ydata() + + if position is None: + position = xdata.mean() + # find closest index + start_ind = np.argmin(np.absolute(xdata - position)) + if direction == 'right': + end_ind = start_ind + 1 + else: + end_ind = start_ind - 1 + + length = (xdata[end_ind] - xdata[start_ind])**2 + (ydata[end_ind] - ydata[start_ind])**2 + ratio = arrow_len/np.sqrt(length) + x_end = xdata[end_ind]*ratio + xdata[start_ind]*(1-ratio) + y_end = ydata[end_ind]*ratio + ydata[start_ind]*(1-ratio) + + line.axes.annotate('', + xytext=(xdata[start_ind], ydata[start_ind]), + xy=(x_end, y_end), + arrowprops=dict(arrowstyle="wedge", color=color), # arrowstyle="->" style + size=size + ) + + +#%% Load the trained models + +POLICEd_args, POLICEd_state_norm, POLICEd_env, POLICEd_agent = load("saved/POLICEd_128") +base_args, base_state_norm, env, base_agent = load("saved/baseline") + + +#%% Compare their rollouts + +######### x state : v [ft/s] gamma [rad] h [ft] + + +# initial_x_states = np.array([[env.v_0_min, env.gamma_0_min, env.h_0_min], +# # [env.v_0_min, env.gamma_0_min, env.h_0_max], # h_0_min = h_0_max +# [env.v_0_min, env.gamma_0_max, env.h_0_min], +# # [env.v_0_min, env.gamma_0_max, env.h_0_max], +# [env.v_0_max, env.gamma_0_min, env.h_0_min], +# # [env.v_0_max, env.gamma_0_min, env.h_0_max], +# [env.v_0_max, env.gamma_0_max, env.h_0_min]]) +# # [env.v_0_max, env.gamma_0_max, env.h_0_max]]) + +if wide: + initial_x_states = np.array([[370., -10*np.pi/180, 500.], + [400., -20*np.pi/180, 500.], + [450., -30*np.pi/180, 500.], + [500., -40*np.pi/180, 500.]]) +else: + ### Crashing the baseline but not POLICEd + initial_x_states = np.array([[330., -23*np.pi/180, 500.], + [350., -30*np.pi/180, 500.], + [400., -45*np.pi/180, 500.], + [450., -55*np.pi/180, 500.], + [500., -57.5*np.pi/180, 500.]]) + + +def multi_rollouts(args, env, agent, state_norm, initial_x_states, title=""): + """Rollouts a trajectory from the policy""" + + N = initial_x_states.shape[0] + s_Trajs = [] + x_Trajs = [] + Actions = [] + + for i in range(N): + env.x_reset_to(initial_x_states[i]) + + s_Trajectory = np.zeros((env.max_episode_steps+1, env.state_size)) + x_Trajectory = np.zeros((env.max_episode_steps+1, env.state_size)) + s_Trajectory[0] = env.s_state + x_Trajectory[0] = initial_x_states[i] + actions = np.zeros((env.max_episode_steps, env.action_size)) + + for t in range(env.max_episode_steps): + with torch.no_grad(): + action = agent.evaluate(env.s_state, state_norm) + s_state, reward, done, _ = env.step(action) + s_Trajectory[t+1] = s_state + x_Trajectory[t+1] = env.x_state + actions[t] = action + if done: break + if not env.in_landing_box(): + v0, gamma0, h0 = initial_x_states[i] + h_dot = s_state[1] + print(title + f" rollout from v = {v0:.0f} ft/s, gamma = {gamma0*180/np.pi:.1f} deg, h = {h0:.0f} ft crashes with vertical velocity {h_dot:.0f} ft/s") + + s_Trajs.append(s_Trajectory[:t+2]) + x_Trajs.append(x_Trajectory[:t+2]) + Actions.append(actions[:t+1]) + + return s_Trajs, x_Trajs, Actions + +POLICEd_s_Trajs, POLICEd_x_Trajs, POLICEd_Actions = multi_rollouts(POLICEd_args, POLICEd_env, POLICEd_agent, POLICEd_state_norm, initial_x_states, title="POLICEd") + +base_s_Trajs, base_x_Trajs, base_Actions = multi_rollouts(base_args, env, base_agent, base_state_norm, initial_x_states, title="Baseline") + + + + + +fig, ax = nice_plot() +plt.title("test plot") +plt.show() + + +#%% Phase portrait plotting + +fig, ax = nice_plot() + +buffer_color = (144./255, 238./255, 144./255) +start_color = (0.2, 0.2, 0.2) +landing_color = (255./255, 192./255, 203./255) +POLICEd_color = '#1f77b4' +base_color = '#ff7f0e' + +plt.title("Phase portrait") +buffer_2D_vertices = [[-env.y_min, -env.s2_min], + [-env.y_max, -env.s2_min], + [-env.y_min, -env.y_dot_max]] +buffer = plt.Polygon(xy=buffer_2D_vertices, color=buffer_color) +ax.add_patch(buffer) + +landing_zone_vertices = [[-env.y_min, -env.s2_min], + [-env.y_max, -env.s2_min], + [-env.y_max, 0], + [-env.y_min, 0.]] +landing_zone = plt.Polygon(xy=landing_zone_vertices, color=landing_color) +ax.add_patch(landing_zone) +plt.xlabel("Altitude $h$ (ft)") +plt.ylabel("Vertical speed $\dot h$ (ft/s)") + +##### Trajectories +max_h_dot = 0. +min_h_dot = -300. +N = len(base_s_Trajs) +for i in range(N): + ### Baseline + line = plt.plot(-base_s_Trajs[i][:, 0], -base_s_Trajs[i][:, 1], color=base_color, linestyle=':', linewidth=3)[0] + if wide: + add_arrow(line, position=200, size=30, arrow_len = 25) + else: + add_arrow(line, position=40, size=30) # for zoomed in plot + ### Interpolate last state at h=0 + x1 = -base_s_Trajs[i][-1, 0] + x2 = -base_s_Trajs[i][-2, 0] + y1 = -base_s_Trajs[i][-1, 1] + y2 = -base_s_Trajs[i][-2, 1] + r = x2/(x2 - x1) + y = r*y1 + (1-r)*y2 + plt.scatter(0, y, s=100, color="red", marker="X", zorder=3) + # plt.scatter(-base_s_Trajs[i][0, 0], -base_s_Trajs[i][0, 1], s=30, color=start_color, zorder=3) + + for t in range(base_s_Trajs[i].shape[0]): + h_dot = -base_s_Trajs[i][t, 1] + if h_dot < min_h_dot: + min_h_dot = h_dot + elif h_dot > max_h_dot: + max_h_dot = h_dot + + ### POLICEd + line = plt.plot(-POLICEd_s_Trajs[i][:, 0], -POLICEd_s_Trajs[i][:, 1], color=POLICEd_color, linewidth=3)[0] + if wide: + add_arrow(line, position=400, size=30, arrow_len = 25) + else: + add_arrow(line, position=61, size=30) # for zoomed in plot + + + for t in range(POLICEd_s_Trajs[i].shape[0]): + h_dot = -POLICEd_s_Trajs[i][t,1] + if h_dot < min_h_dot: + min_h_dot = h_dot + elif h_dot > max_h_dot: + max_h_dot = h_dot + +###### plt.plot(np.array([env.buffer_max[1], env.buffer_max[1]]), np.array([min_h_dot, max_h_dot]), color=constraint_color, linestyle='--', linewidth=4) +crash_marker = mlines.Line2D([], [], color="red", marker='X', linestyle='None', markersize=10, label='crash') +buffer_marker = mlines.Line2D([], [], color=buffer_color, marker='^', linestyle='None', markersize=20, label='buffer $\mathcal{B}$') +landing_marker = mlines.Line2D([], [], color=landing_color, marker='s', linestyle='None', markersize=15, label='target') +base_marker = mlines.Line2D([], [], color=base_color, linewidth='4', markersize=15, linestyle=':', label='baseline') +POLICEd_marker = mlines.Line2D([], [], color=POLICEd_color, linewidth='4', markersize=15, label='POLICEd') + +if wide: + ### Full view + ax.set_yticks([0, -100, -200]) + ax.set_xticks([0, 50, 500]) + plt.xlim([-0.1, env.h_0_max*1.01]) + plt.ylim([min_h_dot*1.01, max_h_dot]) + plt.legend(handles=[landing_marker, buffer_marker, base_marker, POLICEd_marker], + frameon=False, + borderpad=.0, labelspacing=0.3, handletextpad=0.5, handlelength=1.4, + loc='lower left') + # plt.savefig("figures/shuttle_wide.eps", bbox_inches='tight', format="eps", dpi=1200) + + +else: + ### Zoomed in + ax.set_yticks([0, -100, -200]) + ax.set_xticks([0, 50, 100]) + plt.xlim([-1.5, 100]) + plt.ylim([-150, 0]) + plt.legend(handles=[buffer_marker, landing_marker, base_marker, crash_marker, POLICEd_marker], + frameon=False, + borderpad=.0, # fractional whitespace inside the legend border, in font-size units + labelspacing=0.3, # vertical space between the legend entries, in font-size units + handletextpad=0.4, # pad between the legend handle and text, in font-size units + handlelength=1.2, # length of the legend handles, in font-size units + loc='upper right') + + # plt.savefig("figures/shuttle_multi_phase.eps", bbox_inches='tight', format="eps", dpi=1200) + # plt.savefig("figures/shuttle_phase_base_POLICEd.svg", bbox_inches='tight') + +plt.show() + + + + + + + + + + + + + +#%% Trajectories plotting + +# N = initial_x_states.shape[0] +N = 3 + + +fig, ax = nice_plot() +for i in range(N): + time = env.dt * np.arange(POLICEd_x_Trajs[i].shape[0]) + plt.plot(time, POLICEd_x_Trajs[i][:, 0], color=POLICEd_color, linewidth=3) + + time = env.dt * np.arange(base_x_Trajs[i].shape[0]) + plt.plot(time, base_x_Trajs[i][:, 0], color=base_color, linestyle=':', linewidth=3) + +plt.ylabel("velocity $v$ (ft/s)") +plt.xlabel("time (s)") +plt.legend(handles=[base_marker, POLICEd_marker], frameon=False, borderpad=.0, + labelspacing=0.3, handletextpad=0.4, handlelength=1.2, loc='upper right') + +# plt.savefig("figures/shuttle_v.eps", bbox_inches='tight', format="eps", dpi=1200) +plt.savefig("figures/shuttle_v.svg", bbox_inches='tight') + +plt.show() + + + +fig, ax = nice_plot() +for i in range(N): + time = env.dt * np.arange(POLICEd_x_Trajs[i].shape[0]) + plt.plot(time, POLICEd_x_Trajs[i][:, 1]*180/np.pi, color=POLICEd_color, linewidth=3) + + time = env.dt * np.arange(base_x_Trajs[i].shape[0]) + plt.plot(time, base_x_Trajs[i][:, 1]*180/np.pi, color=base_color, linestyle=':', linewidth=3) + +plt.ylabel("flight path angle $\gamma$ (deg)") +plt.xlabel("time (s)") +plt.legend(handles=[base_marker, POLICEd_marker], frameon=False, borderpad=.0, + labelspacing=0.3, handletextpad=0.4, handlelength=1.2, loc='lower right') +# plt.savefig("figures/shuttle_gamma.eps", bbox_inches='tight', format="eps", dpi=1200) +plt.savefig("figures/shuttle_gamma.svg", bbox_inches='tight') +plt.show() + + + +fig, ax = nice_plot() +for i in range(N): + time = env.dt * np.arange(POLICEd_x_Trajs[i].shape[0]) + plt.plot(time, POLICEd_x_Trajs[i][:, 2], color=POLICEd_color, linewidth=3) + + time = env.dt * np.arange(base_x_Trajs[i].shape[0]) + plt.plot(time, base_x_Trajs[i][:, 2], color=base_color, linestyle=':', linewidth=3) + +plt.ylabel("altitude $h$ (ft)") +plt.xlabel("time (s)") +plt.legend(handles=[base_marker, POLICEd_marker], frameon=False, borderpad=.0, + labelspacing=0.3, handletextpad=0.4, handlelength=1.2, loc='upper right') +# plt.savefig("figures/shuttle_h.eps", bbox_inches='tight', format="eps", dpi=1200) +# plt.savefig("figures/shuttle_h.svg", bbox_inches='tight') +plt.show() + + + + +fig, ax = nice_plot() +for i in range(N): + time = env.dt * np.arange(POLICEd_Actions[i].shape[0]) + Alpha = env.action_to_alpha(POLICEd_Actions[i]) + plt.plot(time, Alpha, color=POLICEd_color, linewidth=3) + + time = env.dt * np.arange(base_Actions[i].shape[0]) + Alpha = env.action_to_alpha(base_Actions[i]) + plt.plot(time, Alpha, color=base_color, linestyle=':', linewidth=3) + +plt.ylabel("angle of attack $\\alpha$ (deg)") +plt.xlabel("time (s)") +plt.legend(handles=[base_marker, POLICEd_marker], frameon=False, borderpad=.0, + labelspacing=0.3, handletextpad=0.4, handlelength=1.2, loc='lower right') +# plt.savefig("figures/shuttle_alpha.eps", bbox_inches='tight', format="eps", dpi=1200) +# plt.savefig("figures/shuttle_alpha.svg", bbox_inches='tight') +plt.show() + + + + + diff --git a/SpaceShuttle/main.py b/SpaceShuttle/main.py new file mode 100644 index 0000000..d547fcb --- /dev/null +++ b/SpaceShuttle/main.py @@ -0,0 +1,227 @@ +# -*- coding: utf-8 -*- +""" +Created on Sat Feb 24 9:15:08 2024 + +@author: Jean-Baptiste Bouvier + +Space Shuttle Landing main with PPO for high relative degree constraint +Goal: learn to land the shuttle softly + +Environment has 2 equivalent states: +x_state = [v gamma h] +s_state = [y y' gamma] iterated output derivatives + +PPO agent works with the s_state because the buffer is in s-representation + +Space Shuttle system taken from +"Optimal Online Path Planning for Approach and Landing Guidance" +by Ali Heydari and S. N. Balakrishnan + +PPO implementation from +https://github.com/Lizhi-sjtu/DRL-code-pytorch/tree/main/5.PPO-continuous + +""" + +import copy +import torch +import argparse +import numpy as np +import matplotlib.pyplot as plt + +from PPO import PPO +from SpaceShuttle import ShuttleEnv +from utils import ReplayBuffer, data, modified_training, load, save, rollout, nice_plot +from normalization import Normalization, RewardScaling + + +device = torch.device("cuda" if torch.cuda.is_available() else "cpu") + +#%% Hyperparameters + +loading = True +# warm_starting = False +# loading = False +warm_starting = True + +parser = argparse.ArgumentParser("Hyperparameters Setting for PPO-continuous") +parser.add_argument("--max_train_steps", type=int, default=int(5e6), help="Maximum number of training steps") +parser.add_argument("--evaluate_freq", type=float, default=1e5, help="Evaluate the policy every 'evaluate_freq' steps") +parser.add_argument("--policy_dist", type=str, default="Gaussian", help="Gaussian") # or Beta") +parser.add_argument("--batch_size", type=int, default=4096, help="Batch size") +parser.add_argument("--mini_batch_size", type=int, default=128, help="Minibatch size") +parser.add_argument("--hidden_width", type=int, default=128, help="The number of neurons in hidden layers of the neural network") +parser.add_argument("--lr_a", type=float, default=3e-4, help="Learning rate of actor") +parser.add_argument("--lr_c", type=float, default=3e-4, help="Learning rate of critic") +parser.add_argument("--gamma", type=float, default=0.99, help="Discount factor") +parser.add_argument("--lamda", type=float, default=0.95, help="GAE parameter") +parser.add_argument("--epsilon", type=float, default=0.2, help="PPO clip parameter") +parser.add_argument("--K_epochs", type=int, default=10, help="PPO parameter") +parser.add_argument("--use_adv_norm", type=bool, default=True, help="Trick 1:advantage normalization") +parser.add_argument("--use_state_norm", type=bool, default=True, help="Trick 2: state normalization") +parser.add_argument("--use_reward_scaling", type=bool, default=True, help="Trick 4: reward scaling") +parser.add_argument("--entropy_coef", type=float, default=0.01, help="Trick 5: policy entropy") +parser.add_argument("--use_lr_decay", type=bool, default=True, help="Trick 6: learning rate Decay") +parser.add_argument("--use_grad_clip", type=bool, default=True, help="Trick 7: Gradient clip") +parser.add_argument("--set_adam_eps", type=bool, default=True, help="Trick 9: set Adam epsilon=1e-5") +parser.add_argument("--seed", type=int, default=10, help="Common seed for all environments") + +### POLICEd additions +parser.add_argument("--POLICEd", type=bool, default=True, help= "Whether the layers are POLICEd or linear") +parser.add_argument("--enlarging_buffer", type=bool, default=True, help= "Whether the buffer starts from a volume 0 and increases once policy has converged") +parser.add_argument("--max_iter_enlargment", type=int, default=int(1e4), help="Number of enlarging iterations for the buffer") + +parser.add_argument("--y_min", type=float, default=-50., help="start of the buffer in y") +parser.add_argument("--y_max", type=float, default=0., help="end of the buffer in y, constraint line") +parser.add_argument("--y_dot_max", type=float, default=100., help="maximal output velocity allowed in buffer") +parser.add_argument("--s2_min", type=float, default=6., help="minimal output velocity in the buffer") +### Gamma must be negative to have negative vertical velocity since v > 0 +args = parser.parse_args() +args.freq_reset_in_buffer = 1000 +args.nb_buffer_partitions = 1 + +# args.POLICEd = False + +if args.POLICEd: + filename = "saved/POLICEd_far" + # filename = "saved/POLICEd_128" + # filename = "saved/POLICEd_Buffer2" +else: + filename = "saved/base" + +### Set random seed +np.random.seed(args.seed) +torch.manual_seed(args.seed) + +### Environment +env = ShuttleEnv(args) +env_eval = ShuttleEnv(args) +print(f"epsilon = {env.eps:.1f}") + +### Add new constants to the arguments +args.buffer_vertices = env.buffer_vertices +args.state_dim = env.state_size +args.action_dim = env.action_size +args.max_action = env.action_max + +training_data = data() + +assert args.use_reward_scaling +reward_scaling = RewardScaling(shape=1, gamma=args.gamma) +reward_scaling.running_ms.n = 10000 +reward_scaling.running_ms.mean = np.array([-100]) + + +#%% Loading pretrained + + +if warm_starting: + warm_start_filename = "saved/POLICEd_128" #"saved/base" + loaded_args, loaded_state_norm, loaded_env, loaded_agent = load(warm_start_filename) + _, _, _ = rollout(env, loaded_agent, loaded_state_norm, plot=True, title="Loaded") + + agent = PPO(args) # new agent to be warmstarted at the loaded model + # Initialize the state normalization as in the loaded model + state_norm = Normalization(shape=args.state_dim) + state_norm = copy.deepcopy(loaded_state_norm) + agent.load(warm_start_filename) + _, _, _ = rollout(env, agent, state_norm, plot=True, title="Pretrained") + +elif loading: + _, state_norm, _, agent = load(filename) + + agent.actor.buffer_vertices = torch.tensor(env.buffer_vertices, dtype=torch.float32) + agent.actor.buffer_center = torch.ones((agent.actor.num_vertices, 1)) @ agent.actor.buffer_vertices.mean(axis=0, keepdims=True) + + if agent.actor.iter >= agent.actor.max_iter: # buffer reached full size + agent.actor.vertices = agent.actor.buffer_vertices.clone() + elif agent.actor.iter == 0: + agent.actor.vertices = agent.actor.buffer_center.clone() + + _, _, _ = rollout(env, agent, state_norm, plot=True, title="Loaded") + +else: # cold start + print("Cold start") + agent = PPO(args) # new agent + # Initialize the state normalization as in the loaded model + state_norm = Normalization(shape=args.state_dim) + s = env.s(env.x_0_min) + s = state_norm(torch.tensor(s).unsqueeze(0), update=True) + + # Pretraining of the actor based on the loaded + # Supervised learning of the POLICEd network based on the baseline + + import random + def partition (list_in, n): + random.shuffle(list_in) + return [list_in[i::n] for i in range(n)] + + warm_start_filename = "saved/base" + loaded_args, loaded_state_norm, loaded_env, loaded_agent = load(warm_start_filename) + + opt = torch.optim.Adam(agent.actor.parameters(), lr=1e-5) + state_norm = copy.deepcopy(loaded_state_norm) + nb_test_trajs = 1000 + nb_repeats = 10 + mini_batch_len = 32 + Losses = [] + + for _ in range(nb_test_trajs): + + s_Trajectory, x_Trajectory, Actions = rollout(env, loaded_agent, loaded_state_norm, plot=False) + + actions = torch.tensor(Actions) + s_Traj = torch.tensor(s_Trajectory) + N = Actions.shape[0] # number of training data points + nb_mini_batches = N//mini_batch_len + + for repeats in range(nb_repeats): + # creates random mini batches to split the trajectory + batches = partition(list(range(N)), nb_mini_batches) + + for id_batch in range(nb_mini_batches): + batch = batches[id_batch] + pred = agent.actor(s_Traj[batch], state_norm) + loss = ((pred - actions[batch])**2).mean() + opt.zero_grad() + loss.backward() + opt.step() + Losses.append(loss.item()) + + fig, ax = nice_plot() + plt.title("Pretraining loss") + plt.plot(np.arange(len(Losses)), Losses) + plt.show() + + + _, _, _ = rollout(env, agent, state_norm, plot=True, title="Pretrained") + + +#%% +replay_buffer = ReplayBuffer(args) +trained = modified_training(args, env, agent, replay_buffer, env_eval, state_norm, reward_scaling, training_data) + + +#%% Training with whole trajectories and reward given afterward for the whole trajectory + +trained = False +replay_buffer = ReplayBuffer(args) + +while not trained: + + trained = modified_training(args, env, agent, replay_buffer, env_eval, state_norm, reward_scaling, training_data) + if args.POLICEd: + stable, respect = constraint_training(args, env, agent, replay_buffer, env_eval, state_norm, reward_scaling, training_data) + trained = stable and respect # POLICEd is only trained when achieves reward (stable) and respect repulsion condition + + +#%% Saving trained agent + +# save("saved/POLICEd_128", args, state_norm, agent) +# save(filename, args, state_norm, agent) +# save("saved/baseline", args, state_norm, agent) + + + + + + diff --git a/SpaceShuttle/normalization.py b/SpaceShuttle/normalization.py new file mode 100644 index 0000000..8960642 --- /dev/null +++ b/SpaceShuttle/normalization.py @@ -0,0 +1,60 @@ +""" +Normalization functions taken from the PPO implementation of +https://github.com/Lizhi-sjtu/DRL-code-pytorch/tree/main/5.PPO-continuous +""" + +import torch +import numpy as np + + +class RunningMeanStd: + # Dynamically calculate mean and std + def __init__(self, shape): # shape: the dimension of input data + self.n = 0 + self.mean = np.zeros(shape) + self.S = np.zeros(shape) + self.std = np.sqrt(self.S) + + def update(self, x): + x = np.array(x) + self.n += 1 + if self.n == 1: + self.mean = x + self.std = x + else: + old_mean = self.mean.copy() + self.mean = old_mean + (x - old_mean) / self.n + self.S = self.S + (x - old_mean) * (x - self.mean) + self.std = np.sqrt(self.S / self.n) + + +class Normalization: + def __init__(self, shape): + self.running_ms = RunningMeanStd(shape=shape) + + def __call__(self, x, update=True): + # The mean and std are not updated during the evaluation + if update and x.shape[0] == 1: + self.running_ms.update(x) + x = (x - self.running_ms.mean) / (self.running_ms.std + 1e-8) + if torch.is_tensor(x): + if x.dtype != torch.float32: + x = x.float() + return x + + +class RewardScaling: + def __init__(self, shape, gamma): + self.shape = shape # reward shape=1 + self.gamma = gamma # discount factor + self.running_ms = RunningMeanStd(shape=self.shape) + self.R = np.zeros(self.shape) + + def __call__(self, x): + self.R = self.gamma * self.R + x + self.running_ms.update(self.R) + x = x / (self.running_ms.std + 1e-8) # Only divided std + return x + + def reset(self): # When an episode is done, we should reset 'self.R' + self.R = np.zeros(self.shape) diff --git a/SpaceShuttle/utils.py b/SpaceShuttle/utils.py new file mode 100644 index 0000000..44258f7 --- /dev/null +++ b/SpaceShuttle/utils.py @@ -0,0 +1,608 @@ +# -*- coding: utf-8 -*- +""" +Created on Mon Nov 13 07:52:25 2023 + +@author: Jean-Baptiste Bouvier + +Function utils for the PPO POLICEd applied on the Space Shuttle environmnent. +""" + +import warnings +warnings.simplefilter("ignore", SyntaxWarning) + + +import copy +import torch +import numpy as np +import matplotlib.pyplot as plt +import matplotlib.lines as mlines + +device = torch.device("cuda" if torch.cuda.is_available() else "cpu") + + + + +#%% Training utils + +class ReplayBuffer: + """Store experiences. The sampling from this buffer is handled by the update function + of PPO, where each experience is sampled as part of mini-batches.""" + def __init__(self, args): + self.s = np.zeros((args.batch_size, args.state_dim)) + self.a = np.zeros((args.batch_size, args.action_dim)) + self.a_logprob = np.zeros((args.batch_size, args.action_dim)) + self.r = np.zeros((args.batch_size, 1)) + self.s_ = np.zeros((args.batch_size, args.state_dim)) + self.dw = np.zeros((args.batch_size, 1)) + self.done = np.zeros((args.batch_size, 1)) + self.count = 0 + + def store(self, s, a, a_logprob, r, s_, dw, done): + self.s[self.count] = s + self.a[self.count] = a + self.a_logprob[self.count] = a_logprob + self.r[self.count] = r + self.s_[self.count] = s_ + self.dw[self.count] = dw + self.done[self.count] = done + self.count += 1 + + def numpy_to_tensor(self): + s = torch.tensor(self.s[:self.count], dtype=torch.float) + a = torch.tensor(self.a[:self.count], dtype=torch.float) + a_logprob = torch.tensor(self.a_logprob[:self.count], dtype=torch.float) + r = torch.tensor(self.r[:self.count], dtype=torch.float) + s_ = torch.tensor(self.s_[:self.count], dtype=torch.float) + dw = torch.tensor(self.dw[:self.count], dtype=torch.float) + done = torch.tensor(self.done[:self.count], dtype=torch.float) + + return s, a, a_logprob, r, s_, dw, done + + +def evaluate_policy(args, env, agent, state_norm): + """Evaluates the policy, returns the average reward and whether the + reward threshold has been met""" + + good_policy = True + if env.initial_range: + initial_x_states = np.array([[env.v_0_min, env.gamma_0_min, env.h_0_min], + [env.v_0_min, env.gamma_0_min, env.h_0_max], + [env.v_0_min, env.gamma_0_max, env.h_0_min], + [env.v_0_min, env.gamma_0_max, env.h_0_max], + [env.v_0_max, env.gamma_0_min, env.h_0_min], + [env.v_0_max, env.gamma_0_min, env.h_0_max], + [env.v_0_max, env.gamma_0_max, env.h_0_min], + [env.v_0_max, env.gamma_0_max, env.h_0_max]]) + else: + initial_x_states = np.array([env.x_0_min*0.99, env.x_0_min*1.01]) + + N = initial_x_states.shape[0] + worst_reward = 10000. + for i in range(N): + s = env.x_reset_to(initial_x_states[i]) + done = False + while not done: + with torch.no_grad(): + action = agent.evaluate(s, state_norm) # We use the deterministic policy during the evaluating + s, _, done, _ = env.step(action) + reward = env.final_reward() + if worst_reward > reward: + worst_reward = reward + good_policy *= env.in_landing_box() # needs it to be true for each rollout + + return worst_reward, good_policy + + + +def nice_plot(): + """Makes the plot nice""" + fig = plt.gcf() + ax = fig.gca() + plt.rcParams.update({'font.size': 16}) + plt.rcParams['font.sans-serif'] = ['Palatino Linotype'] + ax.spines['bottom'].set_color('w') + ax.spines['top'].set_color('w') + ax.spines['right'].set_color('w') + ax.spines['left'].set_color('w') + + return fig, ax + + + +def phase_portrait_y(env, s_Traj, title=""): + """Plots the phase portrait (y, y') """ + + fig, ax = nice_plot() + + # Colors + buffer_color = (0., 1., 0., 0.5) + start_color = (0.2, 0.2, 0.2) + traj_color = '#1f77b4' + + plt.title("Phase portrait " + title) + # Buffer + buffer_2D_vertices = [[env.y_min, env.s2_min], + [env.y_max, env.s2_min], + [env.y_min, env.y_dot_max]] + buffer = plt.Polygon(xy=buffer_2D_vertices, color=buffer_color) + ax.add_patch(buffer) + + # Trajectory + plt.plot(s_Traj[:, 0], s_Traj[:, 1], color=traj_color, linewidth=3) + plt.scatter(s_Traj[0, 0], s_Traj[0, 1], s=30, color=start_color, zorder=3) + + plt.xlabel("Output $y$") + plt.ylabel("Output speed $\dot y$") + + start_marker = mlines.Line2D([], [], color=start_color, marker='o', linestyle='None', markersize=10, label='start') + traj_marker = mlines.Line2D([], [], color=traj_color, linewidth='3', markersize=10, label='trajectory') + buffer_marker = mlines.Line2D([], [], color=buffer_color, marker='^', linestyle='None', markersize=20, label='affine buffer $\mathcal{B}$') + plt.legend(handles=[start_marker, traj_marker, buffer_marker], frameon=False, + borderpad=.0, loc="lower left", + labelspacing=0.3, handletextpad=0.5, handlelength=1.4) + plt.show() + + +def phase_portrait_h(env, s_Traj, title=""): + """Plots the phase portrait (h, h dot) """ + + fig, ax = nice_plot() + + # Colors + buffer_color = (0., 1., 0., 0.5) + start_color = (0.2, 0.2, 0.2) + traj_color = '#1f77b4' + + plt.title("Phase portrait " + title) + # Buffer + buffer_2D_vertices = [[-env.y_min, -env.s2_min], + [-env.y_max, -env.s2_min], + [-env.y_min, -env.y_dot_max]] + buffer = plt.Polygon(xy=buffer_2D_vertices, color=buffer_color) + ax.add_patch(buffer) + + # Trajectory + plt.plot(-s_Traj[:, 0], -s_Traj[:, 1], color=traj_color, linewidth=3) + plt.scatter(-s_Traj[0, 0], -s_Traj[0, 1], s=30, color=start_color, zorder=3) + + plt.xlabel("Altitude $h$ (ft)") + plt.ylabel("Vertical speed $\dot h$ (ft/s)") + + start_marker = mlines.Line2D([], [], color=start_color, marker='o', linestyle='None', markersize=10, label='start') + traj_marker = mlines.Line2D([], [], color=traj_color, linewidth='3', markersize=10, label='trajectory') + buffer_marker = mlines.Line2D([], [], color=buffer_color, marker='^', linestyle='None', markersize=20, label='affine buffer $\mathcal{B}$') + plt.legend(handles=[start_marker, traj_marker, buffer_marker], frameon=False, + borderpad=.0, loc="upper right", + labelspacing=0.3, handletextpad=0.5, handlelength=1.4) + plt.show() + + +def plot_traj(env, x_Traj, Actions=None, title=""): + """Plots the trajectory of the space shuttle. + x_state = [v gamma h] action = [alpha] """ + + N = x_Traj.shape[0] # number of steps of the trajectory before terminating + time = env.dt * np.arange(N) + + fig, ax = nice_plot() + plt.title(title) + plt.scatter(time, x_Traj[:, 0], s=10) + plt.ylabel("Velocity (ft/s)") + plt.xlabel("time (s)") + plt.show() + + fig, ax = nice_plot() + plt.title(title) + plt.scatter(time, x_Traj[:, 1]*180/np.pi, s=10) + plt.plot([time[0], time[-1]], [0., 0.], c="orange") + plt.ylabel("Flight path angle (deg)") + plt.xlabel("time (s)") + plt.show() + + fig, ax = nice_plot() + plt.title(title) + plt.scatter(time, x_Traj[:, 2], s=10) + plt.ylabel("altitude h (ft)") + plt.xlabel("time (s)") + plt.show() + + if Actions is not None: + Alpha = env.action_to_alpha(Actions) + fig, ax = nice_plot() + plt.title(title) + plt.scatter(time[:-1], Alpha, s=10) + plt.ylabel("angle of attack alpha (deg)") + plt.xlabel("time (s)") + plt.show() + + + +def training(args, env, agent, replay_buffer, env_eval, state_norm=None, reward_scaling=None, data=None): + """Training of the PPO agent to achieve high reward""" + + evaluate_num = 0 # Record the number of evaluations + if hasattr(args, 'total_steps'): + total_steps = args.total_steps + else: + total_steps = 0 # Record the total steps during the training + stable = False # Inverse pendulum not stabilized + trained = False # Stable and buffer reached full size + episode = 0 + buffer_vertices = args.buffer_vertices + num_vertices = buffer_vertices.shape[0] + + while (not trained): + episode += 1 + + if episode % 5 == 0: # reset in buffer regularly + coefs = np.random.rand(num_vertices) + coefs /= coefs.sum() + s = env.s_reset_to(coefs @ buffer_vertices) + else: + s = env.reset() + + if args.use_reward_scaling: + reward_scaling.reset() + episode_reward = 0 + done = False + while not done: + a, a_logprob = agent.choose_action(s, state_norm) # Action and the corresponding log probability + s_, r, done, _ = env.step(a) + episode_reward += r + + if args.use_reward_scaling: + r = reward_scaling(r) + + # When dead or win or reaching the max_episode_steps, done will be True, we need to distinguish them; + # dw means dead or win, there is no next state s' + # but when reaching the max_episode_steps, there is a next state s' actually. + if done and env.episode_step != env.max_episode_steps: + dw = True + else: + dw = False + + replay_buffer.store(s, a, a_logprob, r, s_, dw, done) + s = s_ + total_steps += 1 + + # When the number of transitions in buffer reaches batch_size, then update + if replay_buffer.count == args.batch_size: + agent.update(replay_buffer, total_steps, state_norm) + replay_buffer.count = 0 + + # Evaluate the policy every 'evaluate_freq' steps + if total_steps % args.evaluate_freq == 0: + evaluate_num += 1 + evaluate_reward, stable = evaluate_policy(args, env_eval, agent, state_norm) + print(f"evaluation {evaluate_num} \t reward: {evaluate_reward:.1f}") + data.plot() + rollout(env_eval, agent, state_norm) + + if stable and args.POLICEd and args.enlarging_buffer and agent.actor.iter == 0: + print("\nStarting to enlarge the buffer") + agent.actor.enlarge_buffer = True + + trained = total_steps > args.max_train_steps or stable + if args.POLICEd and args.enlarging_buffer: + trained = trained and agent.actor.iter > agent.actor.max_iter + + r = respect_ratio(args, env_eval, agent, state_norm) + data.add(episode_reward, r) + + args.total_steps = total_steps + return stable + + + + + + +def modified_training(args, env, agent, replay_buffer, env_eval, state_norm=None, reward_scaling=None, data=None): + """Modified training of the PPO agent to achieve high reward for the Shuttle + Reward is only given when the whole trajectory is complete, so the Replay Buffer + needs to store entire trajectories + """ + + evaluate_num = 0 # Record the number of evaluations + if hasattr(args, 'total_steps'): + total_steps = args.total_steps + else: + total_steps = 0 # Record the total steps during the training + stable = False # Inverse pendulum not stabilized + trained = False # Stable and buffer reached full size + episode = 0 + buffer_vertices = args.buffer_vertices + num_vertices = buffer_vertices.shape[0] + + while (not trained): + episode += 1 + + if episode % args.freq_reset_in_buffer == 0: # reset in buffer regularly + coefs = np.random.rand(num_vertices) + coefs /= coefs.sum() + s = env.s_reset_to(coefs @ buffer_vertices) + else: + s = env.reset() + + done = False + while not done: + a, a_logprob = agent.choose_action(s, state_norm) # Action and the corresponding log probability + s_, r, done, _ = env.step(a) + if done and env.episode_step != env.max_episode_steps: dw = True + else: dw = False + + replay_buffer.store(s, a, a_logprob, r, s_, dw, done) + s = s_ + total_steps += 1 + + c = replay_buffer.count-1 # index of the final step of the episode in replay_buffer + replay_buffer.r[c] += env.final_reward() + data.add(replay_buffer.r[c], 0) # don't care about repulsion respect + + + # When another full episode would not fit in the buffer, update + if replay_buffer.count + env.max_episode_steps > args.batch_size: + agent.update(replay_buffer, total_steps, state_norm) + replay_buffer.count = 0 + + # Evaluate the policy every 'evaluate_freq' steps + if episode % (args.evaluate_freq//env.max_episode_steps) == 0: + evaluate_num += 1 + evaluate_reward, stable = evaluate_policy(args, env_eval, agent, state_norm) + print(f"evaluation {evaluate_num} \t reward: {evaluate_reward:.1f}") + rollout(env_eval, agent, state_norm, plot=True, title=f"Eval {evaluate_num}") + #data.plot() + + if stable and args.POLICEd and args.enlarging_buffer and agent.actor.iter == 0: + print("\nStarting to enlarge the buffer") + agent.actor.enlarge_buffer = True + + trained = total_steps > args.max_train_steps or stable + if args.POLICEd and args.enlarging_buffer: + trained = trained and agent.actor.iter >= agent.actor.max_iter + + args.total_steps = total_steps + return stable + + + +#%% Utils to verify constraint respect + + +def respect_ratio(args, env, agent, state_norm): + """Empirical constraint verification + Returns a percentage of constraint respect over a griding of the buffer.""" + nb_respect = 0 + + nb_steps = 5 + buffer_max = env.buffer_vertices.max(axis=0) + step = (buffer_max - env.buffer_min)/nb_steps + s1_min, s2_min, s3_min = env.buffer_min + s1_max, s2_max, s3_max = buffer_max + s1_range = np.arange(start = s1_min, stop = s1_max + step[0]/2, step = step[0]) + s2_range = np.arange(start = s2_min, stop = s2_max + step[1]/2, step = step[1]) + s3_range = np.arange(start = s3_min, stop = s3_max + step[2]/2, step = step[2]) + + nb_points = 0 + + for s1 in s1_range: + for s2 in s2_range: + for s3 in s3_range: + s = np.array([s1, s2, s3]) + + if env.in_buffer(s_state = s): + env.s_reset_to(s) + with torch.no_grad(): + action = agent.evaluate(s, state_norm) + _, _, _, repulsion_respect = env.step(action) + nb_points += 1 + if repulsion_respect: + nb_respect += 1 + + return nb_respect/nb_points + + + + +class data: + """class to store the rewards over time and the percentage of constraint respect + during training to plot it""" + def __init__(self): + self.len = 0 + self.reward_list = [] + self.respect_list = [] + + def add(self, reward, respect): + self.len += 1 + self.reward_list.append(reward) + self.respect_list.append(respect) + + def add_respect(self, respect): + self.respect_list.append(respect) + + def plot(self): + iterations = np.arange(self.len) + nice_plot() + plt.title("Rewards") + plt.plot(iterations, np.array(self.reward_list)) + plt.xlabel("Episodes") + plt.show() + + + + + + +def rollout(env, agent, state_norm, initial_s_state=None, plot=False, title=""): + """Rollouts a trajectory from the policy""" + + if initial_s_state is None: + s_state = env.reset() + else: + s_state = env.s_reset_to(initial_s_state) + + s_Trajectory = np.zeros((env.max_episode_steps+1, env.state_size)) + x_Trajectory = np.zeros((env.max_episode_steps+1, env.state_size)) + s_Trajectory[0] = s_state + x_Trajectory[0] = env.x_state + Actions = np.zeros((env.max_episode_steps, env.action_size)) + + for t in range(env.max_episode_steps): + with torch.no_grad(): + action = agent.evaluate(s_state, state_norm) + s_state, reward, done, _ = env.step(action) + s_Trajectory[t+1] = s_state + x_Trajectory[t+1] = env.x_state + Actions[t] = action + if done: break + + if plot: + print(f"Vertical velocity at landing: {-s_state[1]:.1f} ft/s") + plot_traj(env, x_Trajectory[:t+2], Actions[:t+1], title) + phase_portrait_h(env, s_Trajectory[:t+2], title) + + return s_Trajectory[:t+2], x_Trajectory[:t+2], Actions[:t+1] + + + +def multi_phase_portraits_h(env, s_Trajs): + """Plots the phase portrait (h, h dot) """ + + fig, ax = nice_plot() + + # Colors + buffer_color = (0., 1., 0., 0.5) + start_color = (0.2, 0.2, 0.2) + traj_color = '#1f77b4' + + plt.title("Phase portrait") + # Buffer + buffer_2D_vertices = [[-env.y_min, -env.s2_min], + [-env.y_max, -env.s2_min], + [-env.y_min, -env.y_dot_max]] + buffer = plt.Polygon(xy=buffer_2D_vertices, color=buffer_color) + ax.add_patch(buffer) + + # Trajectories + N = len(s_Trajs) + for i in range(N): + plt.plot(-s_Trajs[i][:, 0], -s_Trajs[i][:, 1], color=traj_color, linewidth=3) + plt.scatter(-s_Trajs[i][0, 0], -s_Trajs[i][0, 1], s=30, color=start_color, zorder=3) + + plt.xlabel("Altitude $h$ (ft)") + plt.ylabel("Vertical speed $\dot h$ (ft/s)") + + start_marker = mlines.Line2D([], [], color=start_color, marker='o', linestyle='None', markersize=10, label='start') + traj_marker = mlines.Line2D([], [], color=traj_color, linewidth='3', markersize=10, label='trajectories') + buffer_marker = mlines.Line2D([], [], color=buffer_color, marker='^', linestyle='None', markersize=20, label='affine buffer $\mathcal{B}$') + plt.legend(handles=[start_marker, traj_marker, buffer_marker], frameon=False, + borderpad=.0, loc="upper right", + labelspacing=0.3, handletextpad=0.5, handlelength=1.4) + plt.show() + + +def multi_plot_trajs(env, x_Trajs, Actions=None): + """Plots the trajectories of the space shuttle. + x_state = [v gamma h] action = [alpha] """ + + N = len(x_Trajs) + + fig, ax = nice_plot() + for i in range(N): + time = env.dt * np.arange(x_Trajs[i].shape[0]) + plt.scatter(time, x_Trajs[i][:, 0], s=10) + plt.ylabel("Velocity (ft/s)") + plt.xlabel("time (s)") + plt.show() + + fig, ax = nice_plot() + for i in range(N): + time = env.dt * np.arange(x_Trajs[i].shape[0]) + plt.scatter(time, x_Trajs[i][:, 1]*180/np.pi, s=10) + plt.plot([time[0], time[-1]], [0., 0.], c="orange") + plt.ylabel("Flight path angle (deg)") + plt.xlabel("time (s)") + plt.show() + + fig, ax = nice_plot() + for i in range(N): + time = env.dt * np.arange(x_Trajs[i].shape[0]) + plt.scatter(time, x_Trajs[i][:, 2], s=10) + plt.ylabel("altitude h (ft)") + plt.xlabel("time (s)") + plt.show() + + if Actions is not None: + fig, ax = nice_plot() + for i in range(N): + time = env.dt * np.arange(Actions[i].shape[0]) + Alpha = env.action_to_alpha(Actions[i]) + plt.scatter(time, Alpha, s=10) + plt.ylabel("angle of attack alpha (deg)") + plt.xlabel("time (s)") + plt.show() + + + + + + +#%% Utils to save and load the whole framework + + +import json +import argparse +from PPO import PPO +from SpaceShuttle import ShuttleEnv +from normalization import Normalization + + + + +def save(filename, args, state_norm, agent): + """Saving the arguments, state_norm and controller""" + agent.save(filename) + args_2 = copy.deepcopy(args) + + ### Convert arrays into lists as arrays cannot be stored in json + args_2.buffer_vertices = args.buffer_vertices.tolist() + args_2.state_norm_n = state_norm.running_ms.n + args_2.state_norm_mean = state_norm.running_ms.mean.tolist() + args_2.state_norm_S = state_norm.running_ms.S.tolist() + args_2.state_norm_std = state_norm.running_ms.std.tolist() + + with open(filename + '_args.txt', 'w') as f: + json.dump(args_2.__dict__, f, indent=2) + + + +def load(filename): + """Loads the arguments, environment, state_norm and controller""" + + parser = argparse.ArgumentParser("Hyperparameters Setting for PPO-continuous") + args = parser.parse_args() + with open(filename + '_args.txt', 'r') as f: + args.__dict__ = json.load(f) + + ### Convert lists back to arrays as arrays cannot be stored in json + args.buffer_vertices = np.array(args.buffer_vertices) + + ### state normalization + state_norm = Normalization(shape=args.state_dim) + state_norm.running_ms.n = args.state_norm_n + state_norm.running_ms.mean = np.array(args.state_norm_mean) + state_norm.running_ms.S = np.array(args.state_norm_S) + state_norm.running_ms.std = np.array(args.state_norm_std) + + np.random.seed(args.seed) + torch.manual_seed(args.seed) + env = ShuttleEnv(args) + + agent = PPO(args) + agent.load(filename) + if args.POLICEd and args.enlarging_buffer: # sets buffer to fully enlarged + agent.actor.iter = args.max_iter_enlargment+1 + agent.actor.vertices = agent.actor.buffer_vertices + + return args, state_norm, env, agent +