diff options
Diffstat (limited to 'ec2')
-rwxr-xr-x | ec2/spark_ec2.py | 236 |
1 files changed, 168 insertions, 68 deletions
diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py index 0b296332a2..a5384d3bda 100755 --- a/ec2/spark_ec2.py +++ b/ec2/spark_ec2.py @@ -19,7 +19,6 @@ from __future__ import with_statement -import boto import logging import os import random @@ -31,8 +30,9 @@ import time import urllib2 from optparse import OptionParser from sys import stderr +import boto from boto.ec2.blockdevicemapping import BlockDeviceMapping, EBSBlockDeviceType - +from boto import ec2 # A static URL from which to figure out the latest Mesos EC2 AMI LATEST_AMI_URL = "https://s3.amazonaws.com/mesos-images/ids/latest-spark-0.6" @@ -61,7 +61,9 @@ def parse_args(): parser.add_option("-r", "--region", default="us-east-1", help="EC2 region zone to launch instances in") parser.add_option("-z", "--zone", default="", - help="Availability zone to launch instances in") + help="Availability zone to launch instances in, or 'all' to spread " + + "slaves across multiple (an additional $0.01/Gb for bandwidth" + + "between zones applies)") parser.add_option("-a", "--ami", default="latest", help="Amazon Machine Image ID to use, or 'latest' to use latest " + "available AMI (default: latest)") @@ -84,6 +86,8 @@ def parse_args(): help="'mesos' for a mesos cluster, 'standalone' for a standalone spark cluster (default: mesos)") parser.add_option("-u", "--user", default="root", help="The ssh user you want to connect as (default: root)") + parser.add_option("--delete-groups", action="store_true", default=False, + help="When destroying a cluster, also destroy the security groups that were created") (opts, args) = parser.parse_args() if len(args) != 2: @@ -97,14 +101,20 @@ def parse_args(): if opts.cluster_type not in ["mesos", "standalone"] and action == "launch": print >> stderr, ("ERROR: Invalid cluster type: " + opts.cluster_type) sys.exit(1) - if os.getenv('AWS_ACCESS_KEY_ID') == None: - print >> stderr, ("ERROR: The environment variable AWS_ACCESS_KEY_ID " + - "must be set") - sys.exit(1) - if os.getenv('AWS_SECRET_ACCESS_KEY') == None: - print >> stderr, ("ERROR: The environment variable AWS_SECRET_ACCESS_KEY " + - "must be set") - sys.exit(1) + + # Boto config check + # http://boto.cloudhackers.com/en/latest/boto_config_tut.html + home_dir = os.getenv('HOME') + if home_dir == None or not os.path.isfile(home_dir + '/.boto'): + if not os.path.isfile('/etc/boto.cfg'): + if os.getenv('AWS_ACCESS_KEY_ID') == None: + print >> stderr, ("ERROR: The environment variable AWS_ACCESS_KEY_ID " + + "must be set") + sys.exit(1) + if os.getenv('AWS_SECRET_ACCESS_KEY') == None: + print >> stderr, ("ERROR: The environment variable AWS_SECRET_ACCESS_KEY " + + "must be set") + sys.exit(1) return (opts, action, cluster_name) @@ -180,16 +190,12 @@ def launch_cluster(conn, opts, cluster_name): zoo_group.authorize('tcp', 3888, 3888, '0.0.0.0/0') # Check if instances are already running in our groups - print "Checking for running cluster..." - reservations = conn.get_all_instances() - for res in reservations: - group_names = [g.id for g in res.groups] - if master_group.name in group_names or slave_group.name in group_names or zoo_group.name in group_names: - active = [i for i in res.instances if is_active(i)] - if len(active) > 0: - print >> stderr, ("ERROR: There are already instances running in " + - "group %s, %s or %s" % (master_group.name, slave_group.name, zoo_group.name)) - sys.exit(1) + active_nodes = get_existing_cluster(conn, opts, cluster_name, + die_on_error=False) + if any(active_nodes): + print >> stderr, ("ERROR: There are already instances running in " + + "group %s, %s or %s" % (master_group.name, slave_group.name, zoo_group.name)) + sys.exit(1) # Figure out the latest AMI from our static URL if opts.ami == "latest": @@ -221,55 +227,84 @@ def launch_cluster(conn, opts, cluster_name): # Launch spot instances with the requested price print ("Requesting %d slaves as spot instances with price $%.3f" % (opts.slaves, opts.spot_price)) - slave_reqs = conn.request_spot_instances( - price = opts.spot_price, - image_id = opts.ami, - launch_group = "launch-group-%s" % cluster_name, - placement = opts.zone, - count = opts.slaves, - key_name = opts.key_pair, - security_groups = [slave_group], - instance_type = opts.instance_type, - block_device_map = block_map) - my_req_ids = [req.id for req in slave_reqs] + zones = get_zones(conn, opts) + num_zones = len(zones) + i = 0 + my_req_ids = [] + for zone in zones: + num_slaves_this_zone = get_partition(opts.slaves, num_zones, i) + slave_reqs = conn.request_spot_instances( + price = opts.spot_price, + image_id = opts.ami, + launch_group = "launch-group-%s" % cluster_name, + placement = zone, + count = num_slaves_this_zone, + key_name = opts.key_pair, + security_groups = [slave_group], + instance_type = opts.instance_type, + block_device_map = block_map) + my_req_ids += [req.id for req in slave_reqs] + i += 1 + print "Waiting for spot instances to be granted..." - while True: - time.sleep(10) - reqs = conn.get_all_spot_instance_requests() - id_to_req = {} - for r in reqs: - id_to_req[r.id] = r - active = 0 - instance_ids = [] - for i in my_req_ids: - if id_to_req[i].state == "active": - active += 1 - instance_ids.append(id_to_req[i].instance_id) - if active == opts.slaves: - print "All %d slaves granted" % opts.slaves - reservations = conn.get_all_instances(instance_ids) - slave_nodes = [] - for r in reservations: - slave_nodes += r.instances - break - else: - print "%d of %d slaves granted, waiting longer" % (active, opts.slaves) + try: + while True: + time.sleep(10) + reqs = conn.get_all_spot_instance_requests() + id_to_req = {} + for r in reqs: + id_to_req[r.id] = r + active_instance_ids = [] + for i in my_req_ids: + if i in id_to_req and id_to_req[i].state == "active": + active_instance_ids.append(id_to_req[i].instance_id) + if len(active_instance_ids) == opts.slaves: + print "All %d slaves granted" % opts.slaves + reservations = conn.get_all_instances(active_instance_ids) + slave_nodes = [] + for r in reservations: + slave_nodes += r.instances + break + else: + print "%d of %d slaves granted, waiting longer" % ( + len(active_instance_ids), opts.slaves) + except: + print "Canceling spot instance requests" + conn.cancel_spot_instance_requests(my_req_ids) + # Log a warning if any of these requests actually launched instances: + (master_nodes, slave_nodes, zoo_nodes) = get_existing_cluster( + conn, opts, cluster_name, die_on_error=False) + running = len(master_nodes) + len(slave_nodes) + len(zoo_nodes) + if running: + print >> stderr, ("WARNING: %d instances are still running" % running) + sys.exit(0) else: # Launch non-spot instances - slave_res = image.run(key_name = opts.key_pair, - security_groups = [slave_group], - instance_type = opts.instance_type, - placement = opts.zone, - min_count = opts.slaves, - max_count = opts.slaves, - block_device_map = block_map) - slave_nodes = slave_res.instances - print "Launched slaves, regid = " + slave_res.id + zones = get_zones(conn, opts) + num_zones = len(zones) + i = 0 + slave_nodes = [] + for zone in zones: + num_slaves_this_zone = get_partition(opts.slaves, num_zones, i) + if num_slaves_this_zone > 0: + slave_res = image.run(key_name = opts.key_pair, + security_groups = [slave_group], + instance_type = opts.instance_type, + placement = zone, + min_count = num_slaves_this_zone, + max_count = num_slaves_this_zone, + block_device_map = block_map) + slave_nodes += slave_res.instances + print "Launched %d slaves in %s, regid = %s" % (num_slaves_this_zone, + zone, slave_res.id) + i += 1 # Launch masters master_type = opts.master_instance_type if master_type == "": master_type = opts.instance_type + if opts.zone == 'all': + opts.zone = random.choice(conn.get_all_zones()).name master_res = image.run(key_name = opts.key_pair, security_groups = [master_group], instance_type = master_type, @@ -278,7 +313,7 @@ def launch_cluster(conn, opts, cluster_name): max_count = 1, block_device_map = block_map) master_nodes = master_res.instances - print "Launched master, regid = " + master_res.id + print "Launched master in %s, regid = %s" % (zone, master_res.id) zoo_nodes = [] @@ -289,7 +324,7 @@ def launch_cluster(conn, opts, cluster_name): # Get the EC2 instances in an existing cluster if available. # Returns a tuple of lists of EC2 instance objects for the masters, # slaves and zookeeper nodes (in that order). -def get_existing_cluster(conn, opts, cluster_name): +def get_existing_cluster(conn, opts, cluster_name, die_on_error=True): print "Searching for existing cluster " + cluster_name + "..." reservations = conn.get_all_instances() master_nodes = [] @@ -305,9 +340,10 @@ def get_existing_cluster(conn, opts, cluster_name): slave_nodes += res.instances elif group_names == [cluster_name + "-zoo"]: zoo_nodes += res.instances - if master_nodes != [] and slave_nodes != []: + if any((master_nodes, slave_nodes, zoo_nodes)): print ("Found %d master(s), %d slaves, %d ZooKeeper nodes" % (len(master_nodes), len(slave_nodes), len(zoo_nodes))) + if (master_nodes != [] and slave_nodes != []) or not die_on_error: return (master_nodes, slave_nodes, zoo_nodes) else: if master_nodes == [] and slave_nodes != []: @@ -329,6 +365,7 @@ def setup_cluster(conn, master_nodes, slave_nodes, zoo_nodes, opts, deploy_ssh_k print "Copying SSH key %s to master..." % opts.identity_file ssh(master, opts, 'mkdir -p ~/.ssh') scp(master, opts, opts.identity_file, '~/.ssh/id_rsa') + ssh(master, opts, 'chmod 600 ~/.ssh/id_rsa') print "Running setup on master..." if opts.cluster_type == "mesos": setup_mesos_cluster(master, opts) @@ -467,9 +504,30 @@ def ssh(host, opts, command): (opts.identity_file, opts.user, host, command), shell=True) +# Gets a list of zones to launch instances in +def get_zones(conn, opts): + if opts.zone == 'all': + zones = [z.name for z in conn.get_all_zones()] + else: + zones = [opts.zone] + return zones + + +# Gets the number of items in a partition +def get_partition(total, num_partitions, current_partitions): + num_slaves_this_zone = total / num_partitions + if (total % num_partitions) - current_partitions > 0: + num_slaves_this_zone += 1 + return num_slaves_this_zone + + def main(): (opts, action, cluster_name) = parse_args() - conn = boto.ec2.connect_to_region(opts.region) + try: + conn = ec2.connect_to_region(opts.region) + except Exception as e: + print >> stderr, (e) + sys.exit(1) # Select an AZ at random if it was not specified. if opts.zone == "": @@ -491,7 +549,7 @@ def main(): "Destroy cluster " + cluster_name + " (y/N): ") if response == "y": (master_nodes, slave_nodes, zoo_nodes) = get_existing_cluster( - conn, opts, cluster_name) + conn, opts, cluster_name, die_on_error=False) print "Terminating master..." for inst in master_nodes: inst.terminate() @@ -502,6 +560,48 @@ def main(): print "Terminating zoo..." for inst in zoo_nodes: inst.terminate() + + # Delete security groups as well + if opts.delete_groups: + print "Deleting security groups (this will take some time)..." + group_names = [cluster_name + "-master", cluster_name + "-slaves", cluster_name + "-zoo"] + + attempt = 1; + while attempt <= 3: + print "Attempt %d" % attempt + groups = [g for g in conn.get_all_security_groups() if g.name in group_names] + success = True + # Delete individual rules in all groups before deleting groups to + # remove dependencies between them + for group in groups: + print "Deleting rules in security group " + group.name + for rule in group.rules: + for grant in rule.grants: + success &= group.revoke(ip_protocol=rule.ip_protocol, + from_port=rule.from_port, + to_port=rule.to_port, + src_group=grant) + + # Sleep for AWS eventual-consistency to catch up, and for instances + # to terminate + time.sleep(30) # Yes, it does have to be this long :-( + for group in groups: + try: + conn.delete_security_group(group.name) + print "Deleted security group " + group.name + except boto.exception.EC2ResponseError: + success = False; + print "Failed to delete security group " + group.name + + # Unfortunately, group.revoke() returns True even if a rule was not + # deleted, so this needs to be rerun if something fails + if success: break; + + attempt += 1 + + if not success: + print "Failed to delete all security groups after 3 tries." + print "Try re-running in a few minutes." elif action == "login": (master_nodes, slave_nodes, zoo_nodes) = get_existing_cluster( @@ -526,7 +626,7 @@ def main(): "Stop cluster " + cluster_name + " (y/N): ") if response == "y": (master_nodes, slave_nodes, zoo_nodes) = get_existing_cluster( - conn, opts, cluster_name) + conn, opts, cluster_name, die_on_error=False) print "Stopping master..." for inst in master_nodes: if inst.state not in ["shutting-down", "terminated"]: |