diff --git a/ec2_run.py b/ec2_run.py index d28dc0f..0a3b132 100644 --- a/ec2_run.py +++ b/ec2_run.py @@ -59,7 +59,7 @@ # Default location to get the spark-ec2 scripts (and ami-list) from DEFAULT_EC2_RUN_GITHUB_REPO = "https://github.com/jinliangwei/ec2-run.git" -DEFAULT_EC2_RUN_BRANCH = "master" +DEFAULT_EC2_RUN_BRANCH = "main" class UsageError(Exception): pass @@ -107,7 +107,7 @@ def parse_args(): parser.add_option( "-z", "--zone", default="", help="Availability zone to launch instances in, or 'all' to spread " + - "slaves across multiple (an additional $0.01/Gb for bandwidth" + + "subordinates across multiple (an additional $0.01/Gb for bandwidth" + "between zones applies) (default: a single zone chosen at random)") parser.add_option( "-a", "--ami", @@ -129,7 +129,7 @@ def parse_args(): "created.") parser.add_option( "--spot-price", metavar="PRICE", type="float", - help="If specified, launch slaves as spot instances with the given " + + help="If specified, launch subordinates as spot instances with the given " + "maximum price (in dollars)") parser.add_option( "-u", "--user", default="ubuntu", @@ -242,7 +242,7 @@ def get_or_make_group(conn, name, vpc_id): # Launch a cluster of the given name, by setting up its security groups, # and then starting new instances in them. -# Returns a tuple of EC2 reservation objects for the slaves +# Returns a tuple of EC2 reservation objects for the subordinates # Fails if there already instances running in the cluster's groups. def launch_cluster(conn, opts, num_nodes, cluster_name): if opts.identity_file is None: @@ -255,25 +255,25 @@ def launch_cluster(conn, opts, num_nodes, cluster_name): print("Setting up security groups...") - slave_group = get_or_make_group(conn, cluster_name + "-slaves", opts.vpc_id) + subordinate_group = get_or_make_group(conn, cluster_name + "-subordinates", opts.vpc_id) authorized_address = opts.authorized_address - if slave_group.rules == []: # Group was just now created + if subordinate_group.rules == []: # Group was just now created if opts.vpc_id is None: - slave_group.authorize(src_group=slave_group) + subordinate_group.authorize(src_group=subordinate_group) else: - slave_group.authorize(ip_protocol='icmp', from_port=-1, to_port=-1, - src_group=slave_group) - slave_group.authorize(ip_protocol='tcp', from_port=0, to_port=65535, - src_group=slave_group) - slave_group.authorize(ip_protocol='udp', from_port=0, to_port=65535, - src_group=slave_group) - slave_group.authorize('tcp', 22, 22, authorized_address) + subordinate_group.authorize(ip_protocol='icmp', from_port=-1, to_port=-1, + src_group=subordinate_group) + subordinate_group.authorize(ip_protocol='tcp', from_port=0, to_port=65535, + src_group=subordinate_group) + subordinate_group.authorize(ip_protocol='udp', from_port=0, to_port=65535, + src_group=subordinate_group) + subordinate_group.authorize('tcp', 22, 22, authorized_address) # Check if instances are already running in our groups - existing_slaves = get_existing_cluster(conn, opts, cluster_name, die_on_error=False) - if existing_slaves: + existing_subordinates = get_existing_cluster(conn, opts, cluster_name, die_on_error=False) + if existing_subordinates: print("ERROR: There are already instances running in group %s" % - slave_group.name, file=stderr) + subordinate_group.name, file=stderr) sys.exit(1) if opts.ami is None: @@ -305,31 +305,31 @@ def launch_cluster(conn, opts, num_nodes, cluster_name): device.delete_on_termination = True block_map["/dev/sd" + chr(ord('s') + i)] = device - # Launch slaves + # Launch subordinates if opts.spot_price is not None: # Launch spot instances with the requested price - print("Requesting %d slaves as spot instances with price $%.3f" % + print("Requesting %d subordinates as spot instances with price $%.3f" % (num_nodes, opts.spot_price)) zones = get_zones(conn, opts) num_zones = len(zones) i = 0 my_req_ids = [] for zone in zones: - num_slaves_this_zone = get_partition(num_nodes, num_zones, i) - slave_reqs = conn.request_spot_instances( + num_subordinates_this_zone = get_partition(num_nodes, num_zones, i) + subordinate_reqs = conn.request_spot_instances( price=opts.spot_price, image_id=opts.ami, launch_group="launch-group-%s" % cluster_name, placement=zone, - count=num_slaves_this_zone, + count=num_subordinates_this_zone, key_name=opts.key_pair, - security_group_ids=[slave_group.id] + additional_group_ids, + security_group_ids=[subordinate_group.id] + additional_group_ids, instance_type=opts.instance_type, block_device_map=block_map, subnet_id=opts.subnet_id, placement_group=opts.placement_group, instance_profile_name=opts.instance_profile_name) - my_req_ids += [req.id for req in slave_reqs] + my_req_ids += [req.id for req in subordinate_reqs] i += 1 print("Waiting for spot instances to be granted...") @@ -347,50 +347,50 @@ def launch_cluster(conn, opts, num_nodes, cluster_name): if len(active_instance_ids) == num_nodes: print("All %d spot instances granted" % (num_nodes + 1)) reservations = conn.get_all_reservations(active_instance_ids) - slave_nodes = [] + subordinate_nodes = [] for r in reservations: - slave_nodes += r.instances + subordinate_nodes += r.instances break else: - print("%d of %d slave spot instances granted, waiting longer" % ( + print("%d of %d subordinate spot instances granted, waiting longer" % ( len(active_instance_ids), num_nodes)) except: print("Canceling spot instance requests") conn.cancel_spot_instance_requests(my_req_ids) # Log a warning if any of these requests actually launched instances: - slave_nodes = get_existing_cluster(conn, opts, cluster_name, die_on_error=False) - running = len(slave_nodes) + subordinate_nodes = get_existing_cluster(conn, opts, cluster_name, die_on_error=False) + running = len(subordinate_nodes) if running: print(("WARNING: %d instances are still running" % running), file=stderr) sys.exit(0) else: - print ("WARNING: --spot-price was not set; consider launch slaves as spot instances to save money") + print ("WARNING: --spot-price was not set; consider launch subordinates as spot instances to save money") # Launch non-spot instances zones = get_zones(conn, opts) num_zones = len(zones) i = 0 - slave_nodes = [] + subordinate_nodes = [] for zone in zones: - num_slaves_this_zone = get_partition(num_nodes, num_zones, i) - if num_slaves_this_zone > 0: - slave_res = image.run( + num_subordinates_this_zone = get_partition(num_nodes, num_zones, i) + if num_subordinates_this_zone > 0: + subordinate_res = image.run( key_name=opts.key_pair, - security_group_ids=[slave_group.id] + additional_group_ids, + security_group_ids=[subordinate_group.id] + additional_group_ids, instance_type=opts.instance_type, placement=zone, - min_count=num_slaves_this_zone, - max_count=num_slaves_this_zone, + min_count=num_subordinates_this_zone, + max_count=num_subordinates_this_zone, block_device_map=block_map, subnet_id=opts.subnet_id, placement_group=opts.placement_group, instance_initiated_shutdown_behavior=opts.instance_initiated_shutdown_behavior, instance_profile_name=opts.instance_profile_name) - slave_nodes += slave_res.instances - print("Launched {s} slave{plural_s} in {z}, regid = {r}".format( - s=num_slaves_this_zone, - plural_s=('' if num_slaves_this_zone == 1 else 's'), + subordinate_nodes += subordinate_res.instances + print("Launched {s} subordinate{plural_s} in {z}, regid = {r}".format( + s=num_subordinates_this_zone, + plural_s=('' if num_subordinates_this_zone == 1 else 's'), z=zone, - r=slave_res.id)) + r=subordinate_res.id)) i += 1 @@ -404,19 +404,19 @@ def launch_cluster(conn, opts, num_nodes, cluster_name): map(str.strip, tag.split(':', 1)) for tag in opts.additional_tags.split(',') ) - for slave in slave_nodes: - slave.add_tags( - dict(additional_tags, Name='{cn}-slave-{iid}'.format(cn=cluster_name, iid=slave.id)) + for subordinate in subordinate_nodes: + subordinate.add_tags( + dict(additional_tags, Name='{cn}-subordinate-{iid}'.format(cn=cluster_name, iid=subordinate.id)) ) # Return all the instances - return slave_nodes + return subordinate_nodes def get_existing_cluster(conn, opts, cluster_name, die_on_error=True): """ Get the EC2 instances in an existing cluster if available. - Returns a tuple of lists of EC2 instance objects for the masters and slaves. + Returns a tuple of lists of EC2 instance objects for the mains and subordinates. """ print("Searching for existing cluster {c} in region {r}...".format( c=cluster_name, r=opts.region)) @@ -433,13 +433,13 @@ def get_instances(group_names): instances = itertools.chain.from_iterable(r.instances for r in reservations) return [i for i in instances if i.state not in ["shutting-down", "terminated"]] - slave_instances = get_instances([cluster_name + "-slaves"]) + subordinate_instances = get_instances([cluster_name + "-subordinates"]) - if any(slave_instances): - print("Found {s} slave{plural_s}.".format( - s=len(slave_instances), - plural_s=('' if len(slave_instances) == 1 else 's'))) - return slave_instances + if any(subordinate_instances): + print("Found {s} subordinate{plural_s}.".format( + s=len(subordinate_instances), + plural_s=('' if len(subordinate_instances) == 1 else 's'))) + return subordinate_instances def permit_root_ssh_login(host, opts): tries = 0 @@ -609,10 +609,10 @@ def get_zones(conn, opts): # Gets the number of items in a partition def get_partition(total, num_partitions, current_partitions): - num_slaves_this_zone = total // num_partitions + num_subordinates_this_zone = total // num_partitions if (total % num_partitions) - current_partitions > 0: - num_slaves_this_zone += 1 - return num_slaves_this_zone + num_subordinates_this_zone += 1 + return num_subordinates_this_zone # Gets the DNS name, taking into account the --private-ips flag def get_dns_name(instance, private_ips=False): @@ -627,50 +627,50 @@ def get_dns_name(instance, private_ips=False): def get_scripts_to_run(scripts_dir): return [fname for fname in os.listdir(scripts_dir) if os.path.isfile(os.path.join(scripts_dir, fname))] -def upload_scripts(slave_nodes, script_list, opts): +def upload_scripts(subordinate_nodes, script_list, opts): for idx in range(0, len(script_list)): script_fname = script_list[idx] - slave_node = slave_nodes[idx] + subordinate_node = subordinate_nodes[idx] fname_path = os.path.join(opts.scripts_dir, script_fname) - slave_name = get_dns_name(slave_node, opts.private_ips) + subordinate_name = get_dns_name(subordinate_node, opts.private_ips) command = [ 'rsync', '-v', '-e', stringify_command(ssh_command(opts)), fname_path, - '%s@%s:%s' % (opts.user, slave_name, opts.deploy_dir) + '%s@%s:%s' % (opts.user, subordinate_name, opts.deploy_dir) ] print(command) subprocess.check_call(command) -def download_results(slave_nodes, script_list, exit_codes, opts): +def download_results(subordinate_nodes, script_list, exit_codes, opts): for idx in range(0, len(exit_codes)): if exit_codes[idx] != 0: continue - slave_node = slave_nodes[idx] + subordinate_node = subordinate_nodes[idx] script_name = script_list[idx] - slave_name = get_dns_name(slave_node, opts.private_ips) - fname_path = os.path.join(opts.output_dir, ("output." + script_name + "." + slave_name)) + subordinate_name = get_dns_name(subordinate_node, opts.private_ips) + fname_path = os.path.join(opts.output_dir, ("output." + script_name + "." + subordinate_name)) command = [ 'rsync', '-rv', '-e', stringify_command(ssh_command(opts)), - '%s@%s:%s' % (opts.user, slave_name, opts.output_path), + '%s@%s:%s' % (opts.user, subordinate_name, opts.output_path), fname_path ] print(command) subprocess.check_call(command) -def run_and_wait_until_finish(slave_nodes, script_list, opts): +def run_and_wait_until_finish(subordinate_nodes, script_list, opts): worker_proc_list = [] for idx in range(0, len(script_list)): script_fname = script_list[idx] - slave_node = slave_nodes[idx] + subordinate_node = subordinate_nodes[idx] command_to_run = opts.command_to_run % script_fname print(command_to_run) - slave_name = get_dns_name(slave_node, opts.private_ips) + subordinate_name = get_dns_name(subordinate_node, opts.private_ips) worker_proc = subprocess.Popen( - ssh_command(opts) + ['%s@%s' % (opts.user, slave_name), + ssh_command(opts) + ['%s@%s' % (opts.user, subordinate_name), command_to_run]) worker_proc_list.append(worker_proc) exit_codes = [p.wait() for p in worker_proc_list] @@ -678,7 +678,7 @@ def run_and_wait_until_finish(slave_nodes, script_list, opts): num_succeeded_runs = sum([1 if ec == 0 else 0 for ec in exit_codes]) print("%d of %d runs are successful" % (num_succeeded_runs, len(exit_codes))) if opts.output_dir != None and opts.output_path != None: - download_results(slave_nodes, script_list, exit_codes, opts) + download_results(subordinate_nodes, script_list, exit_codes, opts) print("results downloaded") def real_main(): @@ -724,26 +724,26 @@ def real_main(): opts.zone = random.choice(conn.get_all_zones()).name if action == "start": - slave_nodes = launch_cluster(conn, opts, num_nodes, cluster_name) + subordinate_nodes = launch_cluster(conn, opts, num_nodes, cluster_name) wait_for_cluster_state( conn=conn, opts=opts, - cluster_instances=slave_nodes, + cluster_instances=subordinate_nodes, cluster_state='ssh-ready' ) - upload_scripts(slave_nodes, script_list, opts) - run_and_wait_until_finish(slave_nodes, script_list, opts) - print("Stopping slaves...") - for inst in slave_nodes: + upload_scripts(subordinate_nodes, script_list, opts) + run_and_wait_until_finish(subordinate_nodes, script_list, opts) + print("Stopping subordinates...") + for inst in subordinate_nodes: if inst.state not in ["shutting-down", "terminated"]: if inst.spot_instance_request_id: inst.terminate() else: inst.stop() elif action == "run": - slave_nodes = get_existing_cluster(conn, opts, cluster_name, die_on_error=False) - upload_scripts(slave_nodes, script_list, opts) - run_and_wait_until_finish(slave_nodes, script_list, opts) + subordinate_nodes = get_existing_cluster(conn, opts, cluster_name, die_on_error=False) + upload_scripts(subordinate_nodes, script_list, opts) + run_and_wait_until_finish(subordinate_nodes, script_list, opts) elif action == "stop": response = raw_input( @@ -751,13 +751,13 @@ def real_main(): cluster_name + "?\nDATA ON EPHEMERAL DISKS WILL BE LOST, " + "BUT THE CLUSTER WILL KEEP USING SPACE ON\n" + "AMAZON EBS IF IT IS EBS-BACKED!!\n" + - "All data on spot-instance slaves will be lost.\n" + + "All data on spot-instance subordinates will be lost.\n" + "Stop cluster " + cluster_name + " (y/N): ") if response == "y": - slave_nodes = get_existing_cluster( + subordinate_nodes = get_existing_cluster( conn, opts, cluster_name, die_on_error=False) - print("Stopping slaves...") - for inst in slave_nodes: + print("Stopping subordinates...") + for inst in subordinate_nodes: if inst.state not in ["shutting-down", "terminated"]: if inst.spot_instance_request_id: inst.terminate()