From 32442ee1e109d834d2359506f0161df8df8caf03 Mon Sep 17 00:00:00 2001 From: Peter Sankauskas Date: Fri, 16 Nov 2012 17:25:28 -0800 Subject: Giving the Spark EC2 script the ability to launch instances spread across multiple availability zones in order to make the cluster more resilient to failure --- ec2/spark_ec2.py | 80 ++++++++++++++++++++++++++++++++++++++++---------------- 1 file changed, 58 insertions(+), 22 deletions(-) (limited to 'ec2/spark_ec2.py') diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py index 2ca4d8020d..a3138d6ef7 100755 --- a/ec2/spark_ec2.py +++ b/ec2/spark_ec2.py @@ -61,7 +61,8 @@ def parse_args(): parser.add_option("-r", "--region", default="us-east-1", help="EC2 region zone to launch instances in") parser.add_option("-z", "--zone", default="", - help="Availability zone to launch instances in") + help="Availability zone to launch instances in, or 'all' to spread " + + "slaves across multiple") parser.add_option("-a", "--ami", default="latest", help="Amazon Machine Image ID to use, or 'latest' to use latest " + "available AMI (default: latest)") @@ -217,17 +218,25 @@ def launch_cluster(conn, opts, cluster_name): # Launch spot instances with the requested price print ("Requesting %d slaves as spot instances with price $%.3f" % (opts.slaves, opts.spot_price)) - slave_reqs = conn.request_spot_instances( - price = opts.spot_price, - image_id = opts.ami, - launch_group = "launch-group-%s" % cluster_name, - placement = opts.zone, - count = opts.slaves, - key_name = opts.key_pair, - security_groups = [slave_group], - instance_type = opts.instance_type, - block_device_map = block_map) - my_req_ids = [req.id for req in slave_reqs] + zones = get_zones(conn, opts) + num_zones = len(zones) + i = 0 + my_req_ids = [] + for zone in zones: + num_slaves_this_zone = get_partition(opts.slaves, num_zones, i) + slave_reqs = conn.request_spot_instances( + price = opts.spot_price, + image_id = opts.ami, + launch_group = "launch-group-%s" % cluster_name, + placement = zone, + count = num_slaves_this_zone, + key_name = opts.key_pair, + security_groups = [slave_group], + instance_type = opts.instance_type, + block_device_map = block_map) + my_req_ids += [req.id for req in slave_reqs] + i += 1 + print "Waiting for spot instances to be granted..." try: while True: @@ -262,20 +271,30 @@ def launch_cluster(conn, opts, cluster_name): sys.exit(0) else: # Launch non-spot instances - slave_res = image.run(key_name = opts.key_pair, - security_groups = [slave_group], - instance_type = opts.instance_type, - placement = opts.zone, - min_count = opts.slaves, - max_count = opts.slaves, - block_device_map = block_map) - slave_nodes = slave_res.instances - print "Launched slaves, regid = " + slave_res.id + zones = get_zones(conn, opts) + num_zones = len(zones) + i = 0 + slave_nodes = [] + for zone in zones: + num_slaves_this_zone = get_partition(opts.slaves, num_zones, i) + slave_res = image.run(key_name = opts.key_pair, + security_groups = [slave_group], + instance_type = opts.instance_type, + placement = zone, + min_count = num_slaves_this_zone, + max_count = num_slaves_this_zone, + block_device_map = block_map) + slave_nodes += slave_res.instances + print "Launched %d slaves in %s, regid = %s" % (num_slaves_this_zone, + zone, slave_res.id) + i += 1 # Launch masters master_type = opts.master_instance_type if master_type == "": master_type = opts.instance_type + if opts.zone == 'all': + opts.zone = random.choice(conn.get_all_zones()).name master_res = image.run(key_name = opts.key_pair, security_groups = [master_group], instance_type = master_type, @@ -284,7 +303,7 @@ def launch_cluster(conn, opts, cluster_name): max_count = 1, block_device_map = block_map) master_nodes = master_res.instances - print "Launched master, regid = " + master_res.id + print "Launched master in %s, regid = %s" % (zone, master_res.id) zoo_nodes = [] @@ -474,6 +493,23 @@ def ssh(host, opts, command): (opts.identity_file, opts.user, host, command), shell=True) +# Gets a list of zones to launch instances in +def get_zones(conn, opts): + if opts.zone == 'all': + zones = [z.name for z in conn.get_all_zones()] + else: + zones = [opts.zone] + return zones + + +# Gets the number of items in a partition +def get_partition(total, num_partitions, current_partitions): + num_slaves_this_zone = total / num_partitions + if (total % num_partitions) - current_partitions > 0: + num_slaves_this_zone += 1 + return num_slaves_this_zone + + def main(): (opts, action, cluster_name) = parse_args() conn = boto.ec2.connect_to_region(opts.region) -- cgit v1.2.3 From 606d252d264b75943983915b20a8d0e7a8a7d20f Mon Sep 17 00:00:00 2001 From: Peter Sankauskas Date: Sat, 17 Nov 2012 23:09:11 -0800 Subject: Adding comment about additional bandwidth charges --- ec2/spark_ec2.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'ec2/spark_ec2.py') diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py index a3138d6ef7..2f48439549 100755 --- a/ec2/spark_ec2.py +++ b/ec2/spark_ec2.py @@ -62,7 +62,8 @@ def parse_args(): help="EC2 region zone to launch instances in") parser.add_option("-z", "--zone", default="", help="Availability zone to launch instances in, or 'all' to spread " + - "slaves across multiple") + "slaves across multiple (an additional $0.01/Gb for bandwidth" + + "between zones applies)") parser.add_option("-a", "--ami", default="latest", help="Amazon Machine Image ID to use, or 'latest' to use latest " + "available AMI (default: latest)") -- cgit v1.2.3