diff --git a/risk_aware_control/riskaware.py b/risk_aware_control/riskaware.py new file mode 100644 index 0000000..59a96c9 --- /dev/null +++ b/risk_aware_control/riskaware.py @@ -0,0 +1,225 @@ +import numpy as np +# import seaborn +import matplotlib.pyplot as plt +from matplotlib import animation + +class Runner: + + def __init__(self): + self.num_systems = 3 + self.num_states = 200 + self.limit = 10 + self.dx = self.limit * 2.0 / self.num_states + self.domain = np.linspace(-self.limit, + self.limit, + self.num_states)[:,None] + + self.var = np.array([2, 1, .5]) + + self.drift = 0#-.15 # systems drifts left + + # action set + self.u = np.array([0, 1, -1, .5, -.5, .25, -.25]) + + self.L = np.zeros((len(self.var), + len(self.u), self.num_states, self.num_states)) + for ii in range(len(self.u)): + for jj in range(self.num_states): + # set the system state + self.x = np.ones(self.num_systems) * self.domain[jj] + # get the probability distribution of x + old_px = self.gen_px() + # apply the control signal + self.physics(np.ones(self.num_systems) * self.u[ii]) + # get the new probability distribution of x + px = self.gen_px() + # calculate the change in the probability distribution + self.L[:, ii, :, jj] = np.copy(px - old_px).T + + # the initial state probability + self.x = np.zeros(self.num_systems) + self.px = self.gen_px() + + self.track_lane_center = [] + + # also need a cost function (Gaussian to move towards the center) + self.make_v() + + self.track_position = [] + self.track_target1 = [] + self.track_target2 = [] + + def make_gauss(self, mean=0, var=.5): + return np.exp(-(self.domain-mean)**2 / (2.0*var**2)) + + def make_v(self, mean=0): + # set up the road + self.v = self.make_gauss(mean=mean,var=2) * 5 - 1 + self.v[np.where(self.v > .5)] = .5 + # make a preference for being in the right lane + self.v += self.make_gauss(mean=mean+2,var=.6) + self.track_lane_center.append(mean+2) + + def physics(self, u): + self.x += (self.drift + u) # simple physics + self.x = np.minimum(self.limit, np.maximum(-self.limit, self.x)) + + def gen_px(self, x=None, var=None): + x = np.copy(self.x) if x is None else x + var = self.var if var is None else var + + px = self.make_gauss(x, var) + # make sure no negative values + px[np.where(px < 0)] = 0.0 + # make sure things sum to 1 + px /= np.sum(px, axis=0) * self.dx + return px + + + def anim_init(self): + self.v_line.set_data([], []) + self.px_line0.set_data([], []) + self.px_line1.set_data([], []) + self.px_line2.set_data([], []) + plt.legend(['value function', 'np.dot(Li, p(x))']) + return self.v_line, self.px_line0, self.px_line1, self.px_line2 + + def anim_animate(self, i): + + # calculate the weights for the actions + self.wu = np.zeros((self.num_systems, len(self.u))) + + for ii in range(self.num_systems): + # calculate weights for all actions simultaneously, v.T * L_i * p(x) + # constrain so that you can only weight actions positively + self.wu[ii] = np.maximum(np.ones(len(self.u)), + np.einsum('lj,ij->i', self.v.T, + np.einsum('ijk,k->ij', self.L[ii], self.px[:,ii]))) + # constrain so that total output power sum_j u_j**2 = 1 + if np.sum(self.wu[ii]) != 0: + self.wu[ii] /= np.sqrt(np.sum(self.wu[ii]**2)) + + # track information for plotting + self.track_position.append(np.copy(self.x)) + # get edges of value function + road = np.where(self.v >= .5) + self.track_target1.append(np.array([self.domain[road[0][0]], # left edge + self.domain[road[0][-1]]])) # right edge + lane = np.where(self.v >= .6) + self.track_target2.append(np.array([self.domain[lane[0][0]], # left edge + self.domain[lane[0][-1]]])) # right edge + # apply the control signal, simulate dynamics and get new state + self.physics(np.dot(self.wu, self.u)) + # get the new probability distribution of x + self.px = self.gen_px() + + # move the target around slowly + self.make_v(np.sin(i*.1)*5) + + self.px_line0.set_data(range(self.num_states), self.px[:,0]) + self.px_line1.set_data(range(self.num_states), self.px[:,1]) + self.px_line2.set_data(range(self.num_states), self.px[:,2]) + self.v_line.set_data(range(self.num_states), self.v) + + return self.v_line, self.px_line0, self.px_line1, self.px_line2 + + def run(self): + fig = plt.figure() + ax = fig.add_subplot(111) + self.v_line, = ax.plot([],[], color='r', lw=3) + self.px_line0, = ax.plot([],[], color='k', lw=3) + self.px_line1, = ax.plot([],[], color='k', lw=3) + self.px_line2, = ax.plot([],[], color='k', lw=3) + + plt.xlim([0, self.num_states-1]) + plt.xticks(np.linspace(0, self.num_states, 11), np.linspace(-10, 10, 11)) + plt.ylim([-1, 1.5]) + + anim = animation.FuncAnimation(fig, self.anim_animate, + init_func=self.anim_init, frames=500, + interval=100, blit=True) + + plt.show() + +if __name__ == '__main__': + + runner = Runner() + runner.run() + + # generate some nice plots + fig = plt.figure(figsize=(8, 8)) + + track_target1 = np.array(runner.track_target1).squeeze() + track_target2 = np.array(runner.track_target2).squeeze() + track_position = np.array(runner.track_position).squeeze() + runner.track_position = np.array(runner.track_position) + + time = track_position.shape[0] + X = np.arange(0, time) + Y = runner.domain + X, Y = np.meshgrid(X, Y) + + plt.subplot(runner.num_systems+2, 1, 1) + plt.title('Position on road') + + plt.subplot(runner.num_systems+2, 1, 4) + plt.fill_between(range(track_target1.shape[0]), + track_target1[:,0], track_target1[:,1], facecolor='y', alpha=.15) + plt.fill_between(range(track_target2.shape[0]), + track_target2[:,0], track_target2[:,1], facecolor='orange', alpha=.45) + + + for ii in range(runner.num_systems): + # plot borders, and path and heatmap for system ii + plt.subplot(runner.num_systems+2, 1, ii+1) + # plot road boundaries + plt.plot(track_target1, 'r--', lw=5) + + # plot a heat map showing sensor information + heatmap = np.zeros((runner.num_states, time)) + for jj in range(time): + heatmap[:,jj] = runner.make_gauss( + mean=runner.track_position[jj, ii], + var=runner.var[ii]).flatten() + plt.pcolormesh(X, Y, heatmap, cmap='Blues') + + # plot filled in zones of desirability, first the road + plt.fill_between(range(track_target1.shape[0]), + track_target1[:,0], track_target1[:,1], + facecolor='y', alpha=.15) + # and now the lane + plt.fill_between(range(track_target2.shape[0]), + track_target2[:,0], track_target2[:,1], + facecolor='orange', alpha=.45) + + # plot actual position of each system + line, = plt.plot(track_position[:,ii], 'k', lw=3) + + plt.legend([line], ['Variance = %.2f'%runner.var[ii]], + frameon=True, bbox_to_anchor=[1,1.05]) + plt.xlim([0, time-1]) + plt.ylabel('Position') + + # plot the borders and path of each + ax = plt.subplot(runner.num_systems+2, 1, 4) + + plt.plot(track_position[:,ii], lw=3) # plot actual position of each system + + plt.xlim([0, time-1]) + plt.legend(runner.var, frameon=True, bbox_to_anchor=[1, 1.05]) + plt.ylabel('Position') + ax.set_xticklabels([]) + ax.set_yticklabels([]) + + plt.subplot(runner.num_systems+2, 1, 5) + target_center = np.array(runner.track_lane_center)[:-1] + plt.plot((runner.track_position[:,0] - target_center)**2, lw=2) + plt.plot((runner.track_position[:,1] - target_center)**2, lw=2) + plt.plot((runner.track_position[:,2] - target_center)**2, lw=2) + plt.legend(runner.var, frameon=True, bbox_to_anchor=[1, 1.05]) + plt.title('Distance from center of lane') + plt.ylabel('Squared error') + plt.xlabel('Time') + + plt.tight_layout() + plt.show() diff --git a/risk_aware_control/riskaware_learning.py b/risk_aware_control/riskaware_learning.py new file mode 100644 index 0000000..7a21473 --- /dev/null +++ b/risk_aware_control/riskaware_learning.py @@ -0,0 +1,210 @@ +import numpy as np +import seaborn +import matplotlib.pyplot as plt +from matplotlib import animation + +class Runner: + + def __init__(self): + self.num_states = 200 + self.limit = 20 + self.dx = self.limit * 2.0 / self.num_states + self.domain = np.linspace(-self.limit, + self.limit, + self.num_states) + + self.x = 0 # initial position + self.var = .4 # variance in sensory information + self.px = self.make_gauss() # the initial state probability + + # constant slide of the system in this direction + self.drift = 1.5 + + # how often should the system randomly choose an action (0-1) + self.exploration = 0.0 + + # action set + self.u = np.array([0, .5, -1, 3, -5]) + # for learning, start with no knowledge about actions + self.L = [(np.random.random((self.num_states,self.num_states))*2-1)*1e-1 \ + for ii in range(len(self.u))] + + # create some more for plotting later + self.L_init = np.copy(self.L) + self.L_actual = [np.zeros((self.num_states, self.num_states)) \ + for ii in range(len(self.u))] + for ii in range(len(self.u)): + for jj in range(self.num_states): + # set the system state + self.x = self.domain[jj] + # get the probability distribution of x + old_px = self.gen_px() + # apply the control signal + self.physics(self.u[ii]) + # get the new probability distribution of x + px = self.gen_px() + # calculate the change in the probability distribution + self.L_actual[ii][:,jj] = np.copy(px - old_px) + + self.x = 0 # initial position + self.gamma = 1e-1 # learning rate + + # also need a cost function (Gaussian to move towards the center) + self.make_v() + + self.track_position = [] + self.track_target = [] + + def make_gauss(self, mean=0, var=.5): + return np.exp(-(self.domain-mean)**2 / (2*var**2)) + + def make_v(self, mean=0): + self.v = self.make_gauss(mean=mean,var=2) + \ + self.make_gauss(mean=mean,var=.01) + self.v = self.v * 2 - 1 + self.v[np.where(self.v > 0)] = 1.0 + + def physics(self, u): + self.x += self.drift + u # simple physics + self.x = min(self.limit, max(-self.limit, self.x)) + + def gen_px(self, x=None, var=None): + x = np.copy(self.x) if x is None else x + var = self.var if var is None else var + + px = self.make_gauss(x, var) + # make sure no negative values + px[np.where(px < 0)] = 0.0 + # make sure things sum to 1 + px /= np.max(np.sum(px) * self.dx) + return px + + def anim_init(self): + self.v_line.set_data([], []) + self.px_line.set_data([], []) + self.Lpx_line0.set_data([], []) + self.Lpx_line1.set_data([], []) + self.Lpx_line2.set_data([], []) + plt.legend(['value function', 'px', 'L0*px', 'L1*px', 'L2*px']) + return self.v_line, self.px_line, self.Lpx_line0, self.Lpx_line1, self.Lpx_line2 + + def anim_animate(self, i): + + # calculate the weights for the actions + self.wu = np.zeros(len(self.u)) + for ii, Li in enumerate(self.L): + # don't clip it here so we can tell the actual winner + self.wu[ii] = np.dot(self.v, np.dot(Li, self.px)) + + # set losers to zero (there can be only one action selected) + index = self.wu.argmax() + # add in some exploration + if int(np.random.random()*10) == 5: + index = np.random.choice(range(3)) + val = self.wu[index] + self.wu = np.zeros(len(self.u)) + # now clip it + self.wu[index] = 1# min(1, val) + # store the predicted change in probability + self.dpx_estimate = np.dot(self.L[index], self.px) + # also store the old px for calculating dpx + self.old_px = np.copy(self.px) + + # track information for plotting + self.track_position.append(np.copy(self.x)) + # get edges of value function + road = np.where(self.v == 1) + self.track_target.append(np.array([self.domain[road[0][0]], # left edge + self.domain[road[0][-1]]])) # right edge + + # simulate dynamics and get new state + self.physics(np.dot(self.wu, self.u)) + # generate the new px + self.px = self.gen_px() + # move the target around slowly + self.make_v(np.sin(i*.01)*(self.limit-1)) + + # do our learning + err = (self.px - self.old_px) - self.dpx_estimate + # NOTE: need to use old_px because that's what our + # dpx_estimate was calculated for + learn = self.gamma * np.outer(err, self.old_px) # learning_rate * err * activities + self.L[index] += learn + + # update the line plots + self.px_line.set_data(range(self.num_states), self.px) + self.Lpx_line0.set_data(range(self.num_states), np.dot(self.L[0], self.px)) + self.Lpx_line1.set_data(range(self.num_states), np.dot(self.L[1], self.px)) + self.Lpx_line2.set_data(range(self.num_states), np.dot(self.L[2], self.px)) + self.v_line.set_data(range(self.num_states), self.v) + + return self.v_line, self.px_line, self.Lpx_line0, self.Lpx_line1, self.Lpx_line2 + + def run(self): + fig = plt.figure() + ax = fig.add_subplot(111) + self.v_line, = ax.plot([],[], color='r', lw=3) + self.px_line, = ax.plot([],[], color='k', lw=3) + self.Lpx_line0, = ax.plot([],[], color='b', lw=3) + self.Lpx_line1, = ax.plot([],[], color='g', lw=3) + self.Lpx_line2, = ax.plot([],[], color='y', lw=3) + + plt.xlim([0, self.num_states-1]) + plt.xticks(np.linspace(0, self.num_states, 11), np.linspace(-10, 10, 11)) + plt.ylim([-1, 1]) + + anim = animation.FuncAnimation(fig, self.anim_animate, + init_func=self.anim_init, frames=10000, + interval=0, blit=True) + plt.show() + +if __name__ == '__main__': + + runner = Runner() + runner.run() + + # generate some nice plots + axes = [] + X, Y = np.meshgrid(runner.domain, runner.domain) + plt.figure(figsize=(9,9)) + for ii in range(len(runner.u)): + # axes.append(plt.subplot(1,3,ii)) + # plt.axis('equal') + # runner.L[ii][0,0] = 1 + # runner.L[ii][0,1] = -1 + # plt.pcolormesh(X, Y, runner.L[ii])#, cmap='terrain_r') + + # plot the starting vs ending vs actual L operators + # plot a heat map showing sensor information + axes.append(plt.subplot(4, len(runner.u), ii+1)) + plt.axis('equal') + runner.L_init[ii][0,0] = 1 + runner.L_init[ii][0,1] = -1 + plt.pcolormesh(X, Y, runner.L_init[ii]) + + axes.append(plt.subplot(4, len(runner.u), ii+1+len(runner.u))) + runner.L[ii][0,0] = 1 + runner.L[ii][0,1] = -1 + plt.axis('equal') + plt.pcolormesh(X, Y, runner.L[ii]) + + axes.append(plt.subplot(4, len(runner.u), (ii+1+2*len(runner.u)))) + plt.axis('equal') + runner.L_actual[ii][0,0] = 1 + runner.L_actual[ii][0,1] = -1 + plt.pcolormesh(X, Y, runner.L_actual[ii]) + + axes.append(plt.subplot(4, len(runner.u), (ii+1+3*len(runner.u)))) + plt.axis('equal') + diff = runner.L[ii] - runner.L_actual[ii] + diff[0,0] = 1 + diff[0,1] = -1 + plt.pcolormesh(X, Y, diff) + + axes[0].set_ylabel('Initial L operator') + axes[1].set_ylabel('Learned L operator') + axes[2].set_ylabel('Actual L operator') + axes[3].set_ylabel('Difference') + plt.suptitle('Learning L operators') + # plt.tight_layout() + plt.show() diff --git a/risk_aware_control/riskaware_learning_multiple.py b/risk_aware_control/riskaware_learning_multiple.py new file mode 100644 index 0000000..31b8c13 --- /dev/null +++ b/risk_aware_control/riskaware_learning_multiple.py @@ -0,0 +1,216 @@ +import numpy as np +import seaborn +import matplotlib.pyplot as plt +from matplotlib import animation + +class Runner: + """ The goal of this script is to be able to learn + the L operators of several actions simultaneously, + such that you don't have to choose only one action + at a time during training. """ + + def __init__(self): + self.num_states = 200 + self.limit = 20 + self.dx = self.limit * 2.0 / self.num_states + self.domain = np.linspace(-self.limit, + self.limit, + self.num_states) + + self.x = 0 # initial position + self.var = .4 # variance in sensory information + # the initial state probability + self.px = self.make_gauss() + + self.drift = 1.5 # constant slide of the system in this direction + + # action set + self.u = np.array([0, .5, -1, 3, -5]) + # for learning, start with no knowledge about actions + self.L = [(np.random.random((self.num_states,self.num_states))*2-1)*1e-1 \ + for ii in range(len(self.u))] + + # create some more for plotting later + self.L_init = np.copy(self.L) + self.L_actual = [np.zeros((self.num_states, self.num_states)) \ + for ii in range(len(self.u))] + for ii in range(len(self.u)): + for jj in range(self.num_states): + # set the system state + self.x = self.domain[jj] + # get the probability distribution of x + old_px = self.gen_px() + # apply the control signal + self.physics(self.u[ii]) + # get the new probability distribution of x + px = self.gen_px() + # calculate the change in the probability distribution + # self.L_actual[ii][jj] = np.copy(px_old - px) + self.L_actual[ii][:,jj] = np.copy(px - old_px) + # self.L[ii] = np.copy(self.L_actual[ii]) + + self.x = 0 # initial position + self.gamma = 1e-1 # learning rate + + # also need a cost function (Gaussian to move towards the center) + self.make_v() + + self.track_position = [] + self.track_target = [] + + def make_gauss(self, mean=0, var=.5): + return np.exp(-(self.domain-mean)**2 / (2*var**2)) + + def make_v(self, mean=0): + self.v = self.make_gauss(mean=mean,var=2) + \ + self.make_gauss(mean=mean,var=.01) + self.v = self.v * 2 - 1 + self.v[np.where(self.v > 0)] = 1.0 + + def physics(self, u): + self.x += self.drift + u # simple physics + self.x = min(self.limit, max(-self.limit, self.x)) + + def gen_px(self, x=None, var=None, noise=False): + x = np.copy(self.x) if x is None else x + var = self.var if var is None else var + + if noise is True: + x += int(np.random.random() * 2) + x = min(self.limit, max(-self.limit, x)) + px = self.make_gauss(x, var) + # make sure no negative values + px[np.where(px < 0)] = 0.0 + # make sure things sum to 1 + px /= np.max(np.sum(px) * self.dx) + return px + + def anim_init(self): + self.v_line.set_data([], []) + self.px_line.set_data([], []) + self.Lpx_line0.set_data([], []) + self.Lpx_line1.set_data([], []) + self.Lpx_line2.set_data([], []) + plt.legend(['value function', 'px', 'L0*px', 'L1*px', 'L2*px']) + return self.v_line, self.px_line, self.Lpx_line0, self.Lpx_line1, self.Lpx_line2 + + def anim_animate(self, i): + + # calculate the weights for the actions + self.wu = np.zeros(len(self.u)) + for ii, Li in enumerate(self.L): + # don't clip it here so we can tell the actual winner + self.wu[ii] = np.dot(self.v, np.dot(Li, self.px)) + + # set losers to zero (there can be only one action selected) + index = self.wu.argmax() + # add in some exploration + if int(np.random.random()*10) == 5: + index = np.random.choice(range(3)) + val = self.wu[index] + self.wu = np.zeros(len(self.u)) + # now clip it + self.wu[index] = 1# min(1, val) + # store the predicted change in probability + self.dpx_estimate = np.dot(self.L[index], self.px) + # also store the old px for calculating dpx + self.old_px = np.copy(self.px) + + # track information for plotting + self.track_position.append(np.copy(self.x)) + # get edges of value function + road = np.where(self.v == 1) + self.track_target.append(np.array([self.domain[road[0][0]], # left edge + self.domain[road[0][-1]]])) # right edge + + # simulate dynamics and get new state + self.physics(np.dot(self.wu, self.u)) + # generate the new px + self.px = self.gen_px(noise=False) + # move the target around slowly + self.make_v(np.sin(i*.01)*(self.limit-1)) + + # do our learning + err = (self.px - self.old_px) - self.dpx_estimate + # NOTE: need to use old_px because that's what our + # dpx_estimate was calculated for + learn = self.gamma * np.outer(err, self.old_px) # learning_rate * err * activities + self.L[index] += learn + + # update the line plots + self.px_line.set_data(range(self.num_states), self.px) + self.Lpx_line0.set_data(range(self.num_states), np.dot(self.L[0], self.px)) + self.Lpx_line1.set_data(range(self.num_states), np.dot(self.L[1], self.px)) + self.Lpx_line2.set_data(range(self.num_states), np.dot(self.L[2], self.px)) + self.v_line.set_data(range(self.num_states), self.v) + + return self.v_line, self.px_line, self.Lpx_line0, self.Lpx_line1, self.Lpx_line2 + + def run(self): + fig = plt.figure() + ax = fig.add_subplot(111) + self.v_line, = ax.plot([],[], color='r', lw=3) + self.px_line, = ax.plot([],[], color='k', lw=3) + self.Lpx_line0, = ax.plot([],[], color='b', lw=3) + self.Lpx_line1, = ax.plot([],[], color='g', lw=3) + self.Lpx_line2, = ax.plot([],[], color='y', lw=3) + + plt.xlim([0, self.num_states-1]) + plt.xticks(np.linspace(0, self.num_states, 11), np.linspace(-10, 10, 11)) + plt.ylim([-1, 1]) + + anim = animation.FuncAnimation(fig, self.anim_animate, + init_func=self.anim_init, frames=10000, + interval=0, blit=True) + plt.show() + +if __name__ == '__main__': + + runner = Runner() + runner.run() + + # generate some nice plots + axes = [] + X, Y = np.meshgrid(runner.domain, runner.domain) + plt.figure(figsize=(9,9)) + for ii in range(len(runner.u)): + # axes.append(plt.subplot(1,3,ii)) + # plt.axis('equal') + # runner.L[ii][0,0] = 1 + # runner.L[ii][0,1] = -1 + # plt.pcolormesh(X, Y, runner.L[ii])#, cmap='terrain_r') + + # plot the starting vs ending vs actual L operators + # plot a heat map showing sensor information + axes.append(plt.subplot(4, len(runner.u), ii+1)) + plt.axis('equal') + runner.L_init[ii][0,0] = 1 + runner.L_init[ii][0,1] = -1 + plt.pcolormesh(X, Y, runner.L_init[ii]) + + axes.append(plt.subplot(4, len(runner.u), ii+1+len(runner.u))) + runner.L[ii][0,0] = 1 + runner.L[ii][0,1] = -1 + plt.axis('equal') + plt.pcolormesh(X, Y, runner.L[ii]) + + axes.append(plt.subplot(4, len(runner.u), (ii+1+2*len(runner.u)))) + plt.axis('equal') + runner.L_actual[ii][0,0] = 1 + runner.L_actual[ii][0,1] = -1 + plt.pcolormesh(X, Y, runner.L_actual[ii]) + + axes.append(plt.subplot(4, len(runner.u), (ii+1+3*len(runner.u)))) + plt.axis('equal') + diff = runner.L[ii] - runner.L_actual[ii] + diff[0,0] = 1 + diff[0,1] = -1 + plt.pcolormesh(X, Y, diff) + + axes[0].set_ylabel('Initial L operator') + axes[1].set_ylabel('Learned L operator') + axes[2].set_ylabel('Actual L operator') + axes[3].set_ylabel('Difference') + plt.suptitle('Learning L operators') + # plt.tight_layout() + plt.show() diff --git a/risk_aware_control/riskaware_neurons.py b/risk_aware_control/riskaware_neurons.py new file mode 100644 index 0000000..d066178 --- /dev/null +++ b/risk_aware_control/riskaware_neurons.py @@ -0,0 +1,242 @@ +import numpy as np +import seaborn +import matplotlib.pyplot as plt +from matplotlib import animation + +class Runner: + """ In this class we're hooking up 200 neurons to the 'car', + so that half of them drive the car to the left, and half of + them drive the car to the right. The risk-aware system needs + to control the car through these neurons. """ + + def __init__(self): + self.num_states = 200 + self.limit = 20 + self.dx = self.limit * 2.0 / self.num_states + self.domain = np.linspace(-self.limit, + self.limit, + self.num_states) + + self.x = 0 # initial position + self.var = .4 # variance in sensory information + # the initial state probability + self.px = self.make_gauss() + + self.drift = 1.5 # constant slide of the system in this direction + + # action set + import nengo + model = nengo.Network() + # I think that the way for this to work is for the input + # to these neurons to be something like direct stimulation, + # and relatively strong, so projecting in either 10 or 0 + # as the two options...at least initially. + # TODO: train up a system where the input can be a range of values + # NOTE: this would possibly just be a whole other set of actions... + # so maybe the way to think about it is that activating each individual + # neuron is an action, and so is not activating each neuron. And then + # if you want to be able to input a range of values that's just more actions. + with model: + ensemble = nengo.Ensemble(n_neurons=200, dimensions=1) # dimensions is irrelevant + + # TODO: is the input here supposed to be direct neural stimulation? + def get_input(t): + return self.u + node_in = nengo.Node(output=get_input, size_out=ensemblen_neurons) + # send the input to directly stimulate each neuron + nengo.Connection(node_in, ensemble.neurons) + + def set_output(t, x): + self.neuron_activity = np.copy(x) + node_out = nengo.Node(output=set_output, size_in=1) + + # half the neurons push to the left (negative weight) + # half the neurons push to the right (positive weight) + weights = np.random.random((200,1)) * 10.0 - 5.0 + nengo.Connection(ensemble, node_out, + transform=weights) + # TODO: add in some noise + + # for learning, start with no knowledge about actions + self.L = [(np.random.random((self.num_states,self.num_states))*2-1)*1e-1 \ + for ii in range(len(self.u))] + + # create some more for plotting later + self.L_init = np.copy(self.L) + self.L_actual = [np.zeros((self.num_states, self.num_states)) \ + for ii in range(len(self.u))] + for ii in range(len(self.u)): + for jj in range(self.num_states): + # set the system state + self.x = self.domain[jj] + # get the probability distribution of x + old_px = self.gen_px() + # apply the control signal + self.physics(self.u[ii]) + # get the new probability distribution of x + px = self.gen_px() + # calculate the change in the probability distribution + # self.L_actual[ii][jj] = np.copy(px_old - px) + self.L_actual[ii][:,jj] = np.copy(px - old_px) + # self.L[ii] = np.copy(self.L_actual[ii]) + + self.x = 0 # initial position + self.gamma = 1e-1 # learning rate + + # also need a cost function (Gaussian to move towards the center) + self.make_v() + + self.track_position = [] + self.track_target = [] + + def make_gauss(self, mean=0, var=.5): + return np.exp(-(self.domain-mean)**2 / (2*var**2)) + + def make_v(self, mean=0): + self.v = self.make_gauss(mean=mean,var=2) + \ + self.make_gauss(mean=mean,var=.01) + self.v = self.v * 2 - 1 + self.v[np.where(self.v > 0)] = 1.0 + + def physics(self, u): + self.x += self.drift + u # simple physics + self.x = min(self.limit, max(-self.limit, self.x)) + + def gen_px(self, x=None, var=None, noise=False): + x = np.copy(self.x) if x is None else x + var = self.var if var is None else var + + if noise is True: + x += int(np.random.random() * 2) + x = min(self.limit, max(-self.limit, x)) + px = self.make_gauss(x, var) + # make sure no negative values + px[np.where(px < 0)] = 0.0 + # make sure things sum to 1 + px /= np.max(np.sum(px) * self.dx) + return px + + def anim_init(self): + self.v_line.set_data([], []) + self.px_line.set_data([], []) + self.Lpx_line0.set_data([], []) + self.Lpx_line1.set_data([], []) + self.Lpx_line2.set_data([], []) + plt.legend(['value function', 'px', 'L0*px', 'L1*px', 'L2*px']) + return self.v_line, self.px_line, self.Lpx_line0, self.Lpx_line1, self.Lpx_line2 + + def anim_animate(self, i): + + # calculate the weights for the actions + self.wu = np.zeros(len(self.u)) + for ii, Li in enumerate(self.L): + # don't clip it here so we can tell the actual winner + self.wu[ii] = np.dot(self.v, np.dot(Li, self.px)) + + # set losers to zero (there can be only one action selected) + index = self.wu.argmax() + # add in some exploration + if int(np.random.random()*10) == 5: + index = np.random.choice(range(3)) + val = self.wu[index] + self.wu = np.zeros(len(self.u)) + # now clip it + self.wu[index] = 1# min(1, val) + # store the predicted change in probability + self.dpx_estimate = np.dot(self.L[index], self.px) + # also store the old px for calculating dpx + self.old_px = np.copy(self.px) + + # track information for plotting + self.track_position.append(np.copy(self.x)) + # get edges of value function + road = np.where(self.v == 1) + self.track_target.append(np.array([self.domain[road[0][0]], # left edge + self.domain[road[0][-1]]])) # right edge + + # simulate dynamics and get new state + self.physics(np.dot(self.wu, self.u)) + # generate the new px + self.px = self.gen_px(noise=False) + # move the target around slowly + self.make_v(np.sin(i*.01)*(self.limit-1)) + + # do our learning + err = (self.px - self.old_px) - self.dpx_estimate + # NOTE: need to use old_px because that's what our + # dpx_estimate was calculated for + learn = self.gamma * np.outer(err, self.old_px) # learning_rate * err * activities + self.L[index] += learn + + # update the line plots + self.px_line.set_data(range(self.num_states), self.px) + self.Lpx_line0.set_data(range(self.num_states), np.dot(self.L[0], self.px)) + self.Lpx_line1.set_data(range(self.num_states), np.dot(self.L[1], self.px)) + self.Lpx_line2.set_data(range(self.num_states), np.dot(self.L[2], self.px)) + self.v_line.set_data(range(self.num_states), self.v) + + return self.v_line, self.px_line, self.Lpx_line0, self.Lpx_line1, self.Lpx_line2 + + def run(self): + fig = plt.figure() + ax = fig.add_subplot(111) + self.v_line, = ax.plot([],[], color='r', lw=3) + self.px_line, = ax.plot([],[], color='k', lw=3) + self.Lpx_line0, = ax.plot([],[], color='b', lw=3) + self.Lpx_line1, = ax.plot([],[], color='g', lw=3) + self.Lpx_line2, = ax.plot([],[], color='y', lw=3) + + plt.xlim([0, self.num_states-1]) + plt.xticks(np.linspace(0, self.num_states, 11), np.linspace(-10, 10, 11)) + plt.ylim([-1, 1]) + + anim = animation.FuncAnimation(fig, self.anim_animate, + init_func=self.anim_init, frames=10000, + interval=0, blit=True) + plt.show() + +if __name__ == '__main__': + + runner = Runner() + runner.run() + + # generate some nice plots + axes = [] + X, Y = np.meshgrid(runner.domain, runner.domain) + plt.figure(figsize=(9,9)) + for ii in range(len(runner.u)): + + # plot the starting vs ending vs actual L operators + # plot a heat map showing sensor information + axes.append(plt.subplot(4, len(runner.u), ii+1)) + plt.axis('equal') + runner.L_init[ii][0,0] = 1 + runner.L_init[ii][0,1] = -1 + plt.pcolormesh(X, Y, runner.L_init[ii]) + + axes.append(plt.subplot(4, len(runner.u), ii+1+len(runner.u))) + runner.L[ii][0,0] = 1 + runner.L[ii][0,1] = -1 + plt.axis('equal') + plt.pcolormesh(X, Y, runner.L[ii]) + + axes.append(plt.subplot(4, len(runner.u), (ii+1+2*len(runner.u)))) + plt.axis('equal') + runner.L_actual[ii][0,0] = 1 + runner.L_actual[ii][0,1] = -1 + plt.pcolormesh(X, Y, runner.L_actual[ii]) + + axes.append(plt.subplot(4, len(runner.u), (ii+1+3*len(runner.u)))) + plt.axis('equal') + diff = runner.L[ii] - runner.L_actual[ii] + diff[0,0] = 1 + diff[0,1] = -1 + plt.pcolormesh(X, Y, diff) + + axes[0].set_ylabel('Initial L operator') + axes[1].set_ylabel('Learned L operator') + axes[2].set_ylabel('Actual L operator') + axes[3].set_ylabel('Difference') + plt.suptitle('Learning L operators') + # plt.tight_layout() + plt.show()