diff --git a/docs/plugins/rotate_asg_instances.md b/docs/plugins/rotate_asg_instances.md new file mode 100644 index 00000000..dc0b5832 --- /dev/null +++ b/docs/plugins/rotate_asg_instances.md @@ -0,0 +1,26 @@ +# Rotate ASG Instances Plugin + +## Overview +The rotate ASG instances plugin rotates the outdated instances in Auto Scaling Groups. It compares the launch configuration and sees if any outdated instances are present. It detaches the instances first then shuts down the instance and waits for a new instance to replace the outdated one in ASG then proceeds to the next outdated instance. + +After all outdated instances are shutdown successfully, it terminates them and reaps the associated volumes. + +## Usage +It allows gracefully shutting down each instance instead of terminating them and killing all the running processes. + +## Configuration +The plugin uses the ssh username specified in the `MOONSHOT_SSH_USER` or the `LOGNAME` environment variable for logging into the ASG instances to shutdown. The value should be the username with which you have the access to the instances. For example: +```ruby +export MOONSHOT_SSH_USER=abhishek.rana +``` + +The plugin needs no additional configuration parameters: + +## Example +```ruby +Moonshot.config do |c| + # ... + c.plugins << Moonshot::Plugins::RotateAsgInstances.new + # ... +end +``` diff --git a/lib/plugins/rotate_asg_instances.rb b/lib/plugins/rotate_asg_instances.rb new file mode 100644 index 00000000..e923b75a --- /dev/null +++ b/lib/plugins/rotate_asg_instances.rb @@ -0,0 +1,17 @@ +# frozen_string_literal: true + +require 'aws-sdk' +require_relative 'rotate_asg_instances/asg' + +module Moonshot + module Plugins + # Rotate ASG instances after update. + class RotateAsgInstances + def post_update(resources) + asg = ASG.new(resources) + asg.rotate_asg_instances + asg.teardown_outdated_instances + end + end + end +end diff --git a/lib/plugins/rotate_asg_instances/asg.rb b/lib/plugins/rotate_asg_instances/asg.rb new file mode 100644 index 00000000..0263aaaa --- /dev/null +++ b/lib/plugins/rotate_asg_instances/asg.rb @@ -0,0 +1,238 @@ +require 'moonshot/ssh_fork_executor' + +module Moonshot + class ASG # rubocop:disable Metrics/ClassLength + include Moonshot::CredsHelper + + def initialize(resources) + @resources = resources + @ilog = @resources.ilog + @ssh_user = ENV['MOONSHOT_SSH_USER'] || ENV['LOGNAME'] + end + + def asg + @asg ||= + Aws::AutoScaling::AutoScalingGroup.new(name: physical_resource_id) + end + + def rotate_asg_instances + @ilog.start_threaded('Rotating ASG instances...') do |step| + @step = step + @volumes_to_delete = outdated_volumes(outdated_instances) + @shutdown_instances = cycle_instances(outdated_instances) + @step.success('ASG instances rotated successfully!') + end + end + + def teardown_outdated_instances + @ilog.start_threaded('Tearing down outdated instances...') do |step| + @step = step + terminate_instances(@shutdown_instances) + reap_volumes(@volumes_to_delete) + @step.success('Outdated instances removed successfully!') + end + end + + def physical_resource_id + @resources.controller.stack + .resources_of_type('AWS::AutoScaling::AutoScalingGroup') + .first.physical_resource_id + end + + def outdated_instances + @outdated_instances ||= + asg.instances.reject do |i| + i.launch_configuration_name == asg.launch_configuration_name + end + end + + private + + def outdated_volumes(outdated_instances) + volumes = [] + outdated_instances.each do |i| + begin + inst = Aws::EC2::Instance.new(id: i.id) + volumes << inst.block_device_mappings.first.ebs.volume_id + rescue StandardError => e + # We're catching all errors here, because failing to reap a volume + # is not a critical error, will not cause issues with the update. + @step.failure('Failed to get volumes for instance '\ + "#{i.instance_id}: #{e.message}") + end + end + volumes + end + + # Cycle the instances in the ASG. + # + # Each instance will be detached one at a time, waiting for the new instance + # to be ready before stopping the worker and terminating the instance. + # + # @param instances [Array] (outdated instances) + # List of instances to cycle. Defaults to all instances with outdated + # launch configurations. + # @return [Array] (array of Aws::AutoScaling::Instance) + # List of shutdown instances. + def cycle_instances(outdated_instances) + shutdown_instances = [] + + if outdated_instances.empty? + @step.success('No instances cycled!') + return [] + end + + @step.success("Cycling #{outdated_instances.size} " \ + "of #{asg.instances.size} instances in " \ + "#{physical_resource_id}...") + + # Iterate over the instances in the stack, detaching and terminating each + # one. + outdated_instances.each do |i| + next if %w(Terminating Terminated).include?(i.lifecycle_state) + + wait_for_instance(i) + detach_instance(i) + + @step.success("Shutting down #{i.instance_id}") + shutdown_instance(i.instance_id) + shutdown_instances << i + end + + @step.success('All instances cycled.') + + shutdown_instances + end + + # Waits for an instance to reach a ready state. + # + # @param instance [Aws::AutoScaling::Instance] Auto scaling instance to wait + # for. + def wait_for_instance(instance, state = 'InService') + instance.wait_until(max_attempts: 60, delay: 10) do |i| + i.lifecycle_state == state + end + end + + # Detach an instance from its ASG. Re-attach if failed. + # + # @param instance [Aws::AutoScaling::Instance] Instance to detach. + def detach_instance(instance) + @step.success("Detaching instance: #{instance.instance_id}") + + # If the ASG can't be brought up to capacity, re-attach the instance. + begin + instance.detach(should_decrement_desired_capacity: false) + @step.success('- Waiting for the AutoScaling '\ + 'Group to be up to capacity') + wait_for_capacity + rescue StandardError => e + @step.failure("Error bringing the ASG up to capacity: #{e.message}") + @step.failure("Attaching instance: #{instance.instance_id}") + reattach_instance(instance) + raise e + end + end + + # Re-attach an instance to its ASG. + # + # @param instance [Aws::AutoScaling::Instance] Instance to re-attach. + def reattach_instance(instance) + instance.load + return unless instance.data.nil? \ + || %w(Detached Detaching).include?(instance.lifecycle_state) + + until instance.data.nil? || instance.lifecycle_state == 'Detached' + sleep 10 + instance.load + end + instance.attach + end + + # Terminate instances. + # + # @param instances [Array] (instances for termination) + # List of instances to terminate. Defaults to all instances with outdated + # launch configurations. + def terminate_instances(outdated_instances) + if outdated_instances.any? + @step.continue( + "Terminating #{outdated_instances.size} outdated instances..." + ) + end + outdated_instances.each do |asg_instance| + instance = Aws::EC2::Instance.new(asg_instance.instance_id) + begin + instance.load + rescue Aws::EC2::Errors::InvalidInstanceIDNotFound + next + end + + next unless %w(stopping stopped).include?(instance.state.name) + + instance.wait_until_stopped + + @step.continue("Terminating #{instance.instance_id}") + instance.terminate + end + end + + def reap_volumes(volumes) + volumes.each do |volume_id| + begin + @step.continue("Deleting volume: #{volume_id}") + ec2_client(region: ENV['AWS_REGION']) + .delete_volume(volume_id: volume_id) + rescue StandardError => e + # We're catching all errors here, because failing to reap a volume + # is not a critical error, will not cause issues with the release. + @step.failure("Failed to delete volume #{volume_id}: #{e.message}") + end + end + end + + # Waits for the ASG to reach the desired capacity. + def wait_for_capacity + @step.continue( + 'Replacing outdated instances with new instances for the AutoScaling Group...' + ) + # While we wait for the asg to reach capacity, report instance statuses + # to the user. + before_wait = proc do + instances = [] + asg.reload.instances.each do |i| + instances << " #{i.instance_id} (#{i.lifecycle_state})" + end + + @step.continue("Instances: #{instances.join(', ')}") + end + + asg.reload.wait_until(before_wait: before_wait, max_attempts: 60, + delay: 30) do |a| + instances_up = a.instances.select do |i| + i.lifecycle_state == 'InService' + end + instances_up.length == a.desired_capacity + end + @step.success('AutoScaling Group up to capacity!') + end + + # Shuts down an instance, waiting for the instance to stop processing requests + # first. We do this so that services will be stopped properly. + # + # @param id [String] ID of the instance to terminate. + def shutdown_instance(id) + instance = Aws::EC2::Instance.new(id: id) + options = [ + 'UserKnownHostsFile=/dev/null', + 'StrictHostKeyChecking=no' + ] + remote = "#{@ssh_user}@#{instance.public_dns_name}" + cmd = "'sudo shutdown -h now'" + remote_cmd = "ssh -o #{options.join(' -o ')} #{remote} #{cmd}" + SSHForkExecutor.new.run(remote_cmd) + + instance.wait_until_stopped + end + end +end diff --git a/spec/moonshot/plugins/asg.rb b/spec/moonshot/plugins/asg.rb new file mode 100644 index 00000000..0ab532ee --- /dev/null +++ b/spec/moonshot/plugins/asg.rb @@ -0,0 +1,285 @@ +describe Moonshot::ASG do + let(:name) { 'cdb-worker-dev-jarmes-WorkerASG-Q7DYM7901RBY' } + let(:instance_id) { 'i-585e91dc' } + let(:instance_id_2) { 'i-685e91dc' } + let(:system) { instance_double(System) } + let(:resources) do + instance_double( + Moonshot::Resources, + stack: instance_double( + Moonshot::Stack, + name: 'test_name', + parameters: {} + ), + ilog: instance_double(Moonshot::InteractiveLoggerProxy), + controller: instance_double( + Moonshot::Controller, + config: instance_double(Moonshot::ControllerConfig, app_name: 'test') + ) + ) + end + + let(:ilog) { resources.ilog } + let(:current_instance) do + instance_double( + Aws::AutoScaling::Instance, + instance_id: instance_id, + lifecycle_state: 'InService', + launch_configuration_name: 'configuration-2' + ) + end + + let(:outdated_instance) do + instance_double( + Aws::AutoScaling::Instance, + instance_id: instance_id_2, + lifecycle_state: 'InService', + launch_configuration_name: 'configuration-1' + ) + end + + before(:each) do + allow(System).to receive(:new) + .and_return(system) + stub_cf_client + end + + def stub_cf_client + @cf_client = instance_double(Aws::CloudFormation::Client) + allow(Aws::CloudFormation::Client).to receive(:new) do + assert_aws_retry_limit + @cf_client + end + allow(@cf_client).to receive(:validate_template).and_return(true) + end + + subject do + described_class.new(resources) + end + + describe '#cycle_instances' do + before(:each) do + allow(subject).to receive(:launch_configuration_name) \ + .and_return('configuration-2') + allow(subject).to receive(:instances).and_return( + [current_instance, outdated_instance] + ) + end + + it 'properly cycles ASG instances' do + expect(ilog).to receive(:start_threaded).with( + 'Rotating ASG instances...' + ) + ilog.start_threaded('Rotating ASG instances...') do |step| + @step = step + expect(subject).to receive(:wait_for_instance).with( + outdated_instance + ) + expect(subject).to receive(:detach_instance).with( + outdated_instance + ).and_call_original + expect(subject).to receive(:wait_for_capacity) + expect(subject).to receive(:shutdown_instance) + .with(instance_id_2) + expect(subject).to receive(:name).and_return(name) + expect(outdated_instance).to receive(:detach) + .with(should_decrement_desired_capacity: false) + + subject.cycle_instances + end + end + + it 'attempts to re-attach if waiting for capacity errors' do + expect(ilog).to receive(:start_threaded).with( + 'Rotating ASG instances...' + ) + ilog.start_threaded('Rotating ASG instances...') do |step| + @step = step + expect(subject).to receive(:wait_for_instance) + .with(outdated_instance) + expect(subject).to receive(:wait_for_capacity).and_raise + expect(subject).to receive(:reattach_instance) + .with(outdated_instance) + expect(subject).to receive(:name).and_return(name) + expect(outdated_instance).to receive(:detach) + .with(should_decrement_desired_capacity: false) + expect { subject.cycle_instances }.to raise_error(RuntimeError) + end + end + end + + describe '#initialize' do + before(:each) do + allow(System).to receive(:new) + .and_return(system) + stub_cf_client + end + context 'when MOONSHOT_SSH_USER is not defined' do + it 'uses LOGNAME for ssh_user' do + ENV['LOGNAME'] = 'SemiCoolDude' + ENV.delete('MOONSHOT_SSH_USER') + expect(subject.instance_variable_get(:@ssh_user)) + .to eq('SemiCoolDude') + end + end + + context 'when MOONSHOT_SSH_USER is defined' do + it 'uses MOONSHOT_SSH_USER for ssh_user' do + ENV['MOONSHOT_SSH_USER'] = 'CoolDude' + expect(subject.instance_variable_get(:@ssh_user)) + .to eq('CoolDude') + end + end + end + + describe '#shutdown_instance' do + let(:hostname) { 'ec2-54-236-102-14.compute-1.amazonaws.com' } + let(:instance) { instance_double(Aws::EC2::Instance) } + subject { super().send(:shutdown_instance, instance_id) } + + before(:each) do + allow(Aws::EC2::Instance).to receive(:new).and_return(instance) + allow(instance).to receive(:public_dns_name) \ + .and_return(hostname) + allow(System).to receive(:exec) + end + + it 'looks up the DNS name of the host' do + expect(instance).to receive(:public_dns_name) \ + .and_return(hostname) + subject + end + + it 'issues a shutdown to the instance' do + ENV['MOONSHOT_SSH_USER'] = 'ci_user' + expect(System).to receive(:exec).with( + /ssh (.*) ci_user@#{hostname} 'sudo shutdown -h now'/, + raise_on_failure: false + ) + subject + end + + it 'runs SSH with proper option to ignore host keys' do + ENV['MOONSHOT_SSH_USER'] = 'ci_user' + opts_string = '-o UserKnownHostsFile=/dev/null ' \ + '-o StrictHostKeyChecking=no' + expect(System).to receive(:exec).with( + /#{opts_string}/, + any_args + ) + subject + end + end + + describe '#detach_instance' do + it 'detaches instance and waits for capacity' do + expect(ilog).to receive(:start_threaded).with( + 'Rotating ASG instances...' + ) + ilog.start_threaded('Rotating ASG instances...') do |step| + @step = step + expect(outdated_instance).to receive(:detach).with( + should_decrement_desired_capacity: false + ) + expect(subject).to receive(:wait_for_capacity) + subject.detach_instance(outdated_instance) + end + end + + it 're-attaches instance after wait for capacity failed' do + expect(ilog).to receive(:start_threaded).with( + 'Rotating ASG instances...' + ) + ilog.start_threaded('Rotating ASG instances...') do |step| + @step = step + expect(outdated_instance).to receive(:detach).with( + should_decrement_desired_capacity: false + ) + expect(subject).to receive(:wait_for_capacity).and_raise + expect(subject).to receive(:reattach_instance).with( + outdated_instance + ) + expect do + subject.detach_instance(outdated_instance) + end.to raise_error(RuntimeError) + end + end + end + + describe '#terminate_instances' do + let(:stopping_ec2_instance) do + instance_double( + Aws::EC2::Instance, + instance_id: "i-#{SecureRandom.hex(4)}", + state: Struct.new(:name).new('stopping') + ) + end + + let(:stopped_ec2_instance) do + instance_double( + Aws::EC2::Instance, + instance_id: "i-#{SecureRandom.hex(4)}", + state: Struct.new(:name).new('stopped') + ) + end + + let(:terminated_ec2_instance) do + instance_double( + Aws::EC2::Instance, + instance_id: "i-#{SecureRandom.hex(4)}", + state: Struct.new(:name).new('terminated') + ) + end + + let(:ec2_instances) do + [ + stopping_ec2_instance, + stopped_ec2_instance, + terminated_ec2_instance + ] + end + + let(:asg_instances) do + [ + instance_double( + Aws::AutoScaling::Instance, + instance_id: stopping_ec2_instance.instance_id, + lifecycle_state: 'InService' + ), + instance_double( + Aws::AutoScaling::Instance, + instance_id: stopped_ec2_instance.instance_id, + lifecycle_state: 'InService' + ), + instance_double( + Aws::AutoScaling::Instance, + instance_id: terminated_ec2_instance.instance_id, + lifecycle_state: 'Terminated' + ) + ] + end + + before(:each) do + ec2_instances.each do |i| + allow(Aws::EC2::Instance).to( + receive(:new).with(i.instance_id).and_return(i) + ) + allow(i).to receive(:load) + allow(i).to receive(:wait_until_stopped) + end + end + + it 'terminates instances in stopping or stopped state' do + expect(ilog).to receive(:start_threaded).with( + 'Rotating ASG instances...' + ) + ilog.start_threaded('Rotating ASG instances...') do |step| + @step = step + expect(stopping_ec2_instance).to receive(:terminate).once + expect(stopped_ec2_instance).to receive(:terminate).once + expect(terminated_ec2_instance).not_to receive(:terminate) + subject.terminate_instances(asg_instances) + end + end + end +end