From 96c9bcfd8d3f45fe43b3857a80fa1a42f983970b Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Tue, 30 Oct 2012 23:32:38 -0700 Subject: Cancel spot instance requests when exiting spark-ec2. --- ec2/spark_ec2.py | 52 +++++++++++++++++++++++++++++++--------------------- 1 file changed, 31 insertions(+), 21 deletions(-) (limited to 'ec2/spark_ec2.py') diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py index 6a3647b218..c0926e214f 100755 --- a/ec2/spark_ec2.py +++ b/ec2/spark_ec2.py @@ -233,27 +233,37 @@ def launch_cluster(conn, opts, cluster_name): block_device_map = block_map) my_req_ids = [req.id for req in slave_reqs] print "Waiting for spot instances to be granted..." - while True: - time.sleep(10) - reqs = conn.get_all_spot_instance_requests() - id_to_req = {} - for r in reqs: - id_to_req[r.id] = r - active = 0 - instance_ids = [] - for i in my_req_ids: - if id_to_req[i].state == "active": - active += 1 - instance_ids.append(id_to_req[i].instance_id) - if active == opts.slaves: - print "All %d slaves granted" % opts.slaves - reservations = conn.get_all_instances(instance_ids) - slave_nodes = [] - for r in reservations: - slave_nodes += r.instances - break - else: - print "%d of %d slaves granted, waiting longer" % (active, opts.slaves) + try: + while True: + time.sleep(10) + reqs = conn.get_all_spot_instance_requests() + id_to_req = {} + for r in reqs: + id_to_req[r.id] = r + active_instance_ids = [] + for i in my_req_ids: + if i in id_to_req and id_to_req[i].state == "active": + active_instance_ids.append(id_to_req[i].instance_id) + if len(active_instance_ids) == opts.slaves: + print "All %d slaves granted" % opts.slaves + reservations = conn.get_all_instances(active_instance_ids) + slave_nodes = [] + for r in reservations: + slave_nodes += r.instances + break + else: + print "%d of %d slaves granted, waiting longer" % ( + len(active_instance_ids), opts.slaves) + except: + print "Canceling spot instance requests" + conn.cancel_spot_instance_requests(my_req_ids) + # Log a warning if any of these requests actually launched instances: + (master_nodes, slave_nodes, zoo_nodes) = get_existing_cluster( + conn, opts, cluster_name, die_on_error=False) + running = len(master_nodes) + len(slave_nodes) + len(zoo_nodes) + if running: + print >> stderr, ("WARNING: %d instances are still running" % running) + sys.exit(0) else: # Launch non-spot instances slave_res = image.run(key_name = opts.key_pair, -- cgit v1.2.3 From 594eed31c43ecbefed069d827b388cdd54456277 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Sat, 3 Nov 2012 17:02:47 -0700 Subject: Fix check for existing instances during EC2 launch. --- ec2/spark_ec2.py | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) (limited to 'ec2/spark_ec2.py') diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py index c0926e214f..2ca4d8020d 100755 --- a/ec2/spark_ec2.py +++ b/ec2/spark_ec2.py @@ -180,16 +180,12 @@ def launch_cluster(conn, opts, cluster_name): zoo_group.authorize('tcp', 3888, 3888, '0.0.0.0/0') # Check if instances are already running in our groups - print "Checking for running cluster..." - reservations = conn.get_all_instances() - for res in reservations: - group_names = [g.id for g in res.groups] - if master_group.name in group_names or slave_group.name in group_names or zoo_group.name in group_names: - active = [i for i in res.instances if is_active(i)] - if len(active) > 0: - print >> stderr, ("ERROR: There are already instances running in " + - "group %s, %s or %s" % (master_group.name, slave_group.name, zoo_group.name)) - sys.exit(1) + active_nodes = get_existing_cluster(conn, opts, cluster_name, + die_on_error=False) + if any(active_nodes): + print >> stderr, ("ERROR: There are already instances running in " + + "group %s, %s or %s" % (master_group.name, slave_group.name, zoo_group.name)) + sys.exit(1) # Figure out the latest AMI from our static URL if opts.ami == "latest": -- cgit v1.2.3 From 6d22f7ccb80f21f0622a3740d8fb3acd66a5b29e Mon Sep 17 00:00:00 2001 From: Peter Sankauskas Date: Fri, 16 Nov 2012 14:02:43 -0800 Subject: Delete security groups when deleting the cluster. As many operations are done on instances in specific security groups, this seems like a reasonable thing to clean up. --- ec2/spark_ec2.py | 14 ++++++++++++++ 1 file changed, 14 insertions(+) (limited to 'ec2/spark_ec2.py') diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py index 2ca4d8020d..17276db6e5 100755 --- a/ec2/spark_ec2.py +++ b/ec2/spark_ec2.py @@ -509,6 +509,20 @@ def main(): print "Terminating zoo..." for inst in zoo_nodes: inst.terminate() + # Delete security groups as well + group_names = [cluster_name + "-master", cluster_name + "-slaves", cluster_name + "-zoo"] + groups = conn.get_all_security_groups() + for group in groups: + if group.name in group_names: + print "Deleting security group " + group.name + # Delete individual rules before deleting group to remove dependencies + for rule in group.rules: + for grant in rule.grants: + group.revoke(ip_protocol=rule.ip_protocol, + from_port=rule.from_port, + to_port=rule.to_port, + src_group=grant) + conn.delete_security_group(group.name) elif action == "login": (master_nodes, slave_nodes, zoo_nodes) = get_existing_cluster( -- cgit v1.2.3 From 32442ee1e109d834d2359506f0161df8df8caf03 Mon Sep 17 00:00:00 2001 From: Peter Sankauskas Date: Fri, 16 Nov 2012 17:25:28 -0800 Subject: Giving the Spark EC2 script the ability to launch instances spread across multiple availability zones in order to make the cluster more resilient to failure --- ec2/spark_ec2.py | 80 ++++++++++++++++++++++++++++++++++++++++---------------- 1 file changed, 58 insertions(+), 22 deletions(-) (limited to 'ec2/spark_ec2.py') diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py index 2ca4d8020d..a3138d6ef7 100755 --- a/ec2/spark_ec2.py +++ b/ec2/spark_ec2.py @@ -61,7 +61,8 @@ def parse_args(): parser.add_option("-r", "--region", default="us-east-1", help="EC2 region zone to launch instances in") parser.add_option("-z", "--zone", default="", - help="Availability zone to launch instances in") + help="Availability zone to launch instances in, or 'all' to spread " + + "slaves across multiple") parser.add_option("-a", "--ami", default="latest", help="Amazon Machine Image ID to use, or 'latest' to use latest " + "available AMI (default: latest)") @@ -217,17 +218,25 @@ def launch_cluster(conn, opts, cluster_name): # Launch spot instances with the requested price print ("Requesting %d slaves as spot instances with price $%.3f" % (opts.slaves, opts.spot_price)) - slave_reqs = conn.request_spot_instances( - price = opts.spot_price, - image_id = opts.ami, - launch_group = "launch-group-%s" % cluster_name, - placement = opts.zone, - count = opts.slaves, - key_name = opts.key_pair, - security_groups = [slave_group], - instance_type = opts.instance_type, - block_device_map = block_map) - my_req_ids = [req.id for req in slave_reqs] + zones = get_zones(conn, opts) + num_zones = len(zones) + i = 0 + my_req_ids = [] + for zone in zones: + num_slaves_this_zone = get_partition(opts.slaves, num_zones, i) + slave_reqs = conn.request_spot_instances( + price = opts.spot_price, + image_id = opts.ami, + launch_group = "launch-group-%s" % cluster_name, + placement = zone, + count = num_slaves_this_zone, + key_name = opts.key_pair, + security_groups = [slave_group], + instance_type = opts.instance_type, + block_device_map = block_map) + my_req_ids += [req.id for req in slave_reqs] + i += 1 + print "Waiting for spot instances to be granted..." try: while True: @@ -262,20 +271,30 @@ def launch_cluster(conn, opts, cluster_name): sys.exit(0) else: # Launch non-spot instances - slave_res = image.run(key_name = opts.key_pair, - security_groups = [slave_group], - instance_type = opts.instance_type, - placement = opts.zone, - min_count = opts.slaves, - max_count = opts.slaves, - block_device_map = block_map) - slave_nodes = slave_res.instances - print "Launched slaves, regid = " + slave_res.id + zones = get_zones(conn, opts) + num_zones = len(zones) + i = 0 + slave_nodes = [] + for zone in zones: + num_slaves_this_zone = get_partition(opts.slaves, num_zones, i) + slave_res = image.run(key_name = opts.key_pair, + security_groups = [slave_group], + instance_type = opts.instance_type, + placement = zone, + min_count = num_slaves_this_zone, + max_count = num_slaves_this_zone, + block_device_map = block_map) + slave_nodes += slave_res.instances + print "Launched %d slaves in %s, regid = %s" % (num_slaves_this_zone, + zone, slave_res.id) + i += 1 # Launch masters master_type = opts.master_instance_type if master_type == "": master_type = opts.instance_type + if opts.zone == 'all': + opts.zone = random.choice(conn.get_all_zones()).name master_res = image.run(key_name = opts.key_pair, security_groups = [master_group], instance_type = master_type, @@ -284,7 +303,7 @@ def launch_cluster(conn, opts, cluster_name): max_count = 1, block_device_map = block_map) master_nodes = master_res.instances - print "Launched master, regid = " + master_res.id + print "Launched master in %s, regid = %s" % (zone, master_res.id) zoo_nodes = [] @@ -474,6 +493,23 @@ def ssh(host, opts, command): (opts.identity_file, opts.user, host, command), shell=True) +# Gets a list of zones to launch instances in +def get_zones(conn, opts): + if opts.zone == 'all': + zones = [z.name for z in conn.get_all_zones()] + else: + zones = [opts.zone] + return zones + + +# Gets the number of items in a partition +def get_partition(total, num_partitions, current_partitions): + num_slaves_this_zone = total / num_partitions + if (total % num_partitions) - current_partitions > 0: + num_slaves_this_zone += 1 + return num_slaves_this_zone + + def main(): (opts, action, cluster_name) = parse_args() conn = boto.ec2.connect_to_region(opts.region) -- cgit v1.2.3 From 606d252d264b75943983915b20a8d0e7a8a7d20f Mon Sep 17 00:00:00 2001 From: Peter Sankauskas Date: Sat, 17 Nov 2012 23:09:11 -0800 Subject: Adding comment about additional bandwidth charges --- ec2/spark_ec2.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'ec2/spark_ec2.py') diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py index a3138d6ef7..2f48439549 100755 --- a/ec2/spark_ec2.py +++ b/ec2/spark_ec2.py @@ -62,7 +62,8 @@ def parse_args(): help="EC2 region zone to launch instances in") parser.add_option("-z", "--zone", default="", help="Availability zone to launch instances in, or 'all' to spread " + - "slaves across multiple") + "slaves across multiple (an additional $0.01/Gb for bandwidth" + + "between zones applies)") parser.add_option("-a", "--ami", default="latest", help="Amazon Machine Image ID to use, or 'latest' to use latest " + "available AMI (default: latest)") -- cgit v1.2.3 From dc2fb3c4b69cd2c5b6a11a08f642d72330b294d4 Mon Sep 17 00:00:00 2001 From: Peter Sankauskas Date: Mon, 19 Nov 2012 14:21:16 -0800 Subject: Allow Boto to use the other config options it supports, and gracefully handling Boto connection exceptions (like AuthFailure) --- ec2/spark_ec2.py | 31 ++++++++++++++++++++----------- 1 file changed, 20 insertions(+), 11 deletions(-) (limited to 'ec2/spark_ec2.py') diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py index 17276db6e5..05c06d32bf 100755 --- a/ec2/spark_ec2.py +++ b/ec2/spark_ec2.py @@ -19,7 +19,6 @@ from __future__ import with_statement -import boto import logging import os import random @@ -32,7 +31,7 @@ import urllib2 from optparse import OptionParser from sys import stderr from boto.ec2.blockdevicemapping import BlockDeviceMapping, EBSBlockDeviceType - +from boto import ec2 # A static URL from which to figure out the latest Mesos EC2 AMI LATEST_AMI_URL = "https://s3.amazonaws.com/mesos-images/ids/latest-spark-0.6" @@ -97,14 +96,20 @@ def parse_args(): if opts.cluster_type not in ["mesos", "standalone"] and action == "launch": print >> stderr, ("ERROR: Invalid cluster type: " + opts.cluster_type) sys.exit(1) - if os.getenv('AWS_ACCESS_KEY_ID') == None: - print >> stderr, ("ERROR: The environment variable AWS_ACCESS_KEY_ID " + - "must be set") - sys.exit(1) - if os.getenv('AWS_SECRET_ACCESS_KEY') == None: - print >> stderr, ("ERROR: The environment variable AWS_SECRET_ACCESS_KEY " + - "must be set") - sys.exit(1) + + # Boto config check + # http://boto.cloudhackers.com/en/latest/boto_config_tut.html + home_dir = os.getenv('HOME') + if home_dir == None or not os.path.isfile(home_dir + '/.boto'): + if not os.path.isfile('/etc/boto.cfg'): + if os.getenv('AWS_ACCESS_KEY_ID') == None: + print >> stderr, ("ERROR: The environment variable AWS_ACCESS_KEY_ID " + + "must be set") + sys.exit(1) + if os.getenv('AWS_SECRET_ACCESS_KEY') == None: + print >> stderr, ("ERROR: The environment variable AWS_SECRET_ACCESS_KEY " + + "must be set") + sys.exit(1) return (opts, action, cluster_name) @@ -476,7 +481,11 @@ def ssh(host, opts, command): def main(): (opts, action, cluster_name) = parse_args() - conn = boto.ec2.connect_to_region(opts.region) + try: + conn = ec2.connect_to_region(opts.region) + except Exception as e: + print >> stderr, (e) + sys.exit(1) # Select an AZ at random if it was not specified. if opts.zone == "": -- cgit v1.2.3