aboutsummaryrefslogtreecommitdiff
path: root/ec2/spark_ec2.py
diff options
context:
space:
mode:
Diffstat (limited to 'ec2/spark_ec2.py')
-rwxr-xr-xec2/spark_ec2.py194
1 files changed, 130 insertions, 64 deletions
diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py
index 6a3647b218..2ab11dbd34 100755
--- a/ec2/spark_ec2.py
+++ b/ec2/spark_ec2.py
@@ -19,7 +19,6 @@
from __future__ import with_statement
-import boto
import logging
import os
import random
@@ -32,7 +31,7 @@ import urllib2
from optparse import OptionParser
from sys import stderr
from boto.ec2.blockdevicemapping import BlockDeviceMapping, EBSBlockDeviceType
-
+from boto import ec2
# A static URL from which to figure out the latest Mesos EC2 AMI
LATEST_AMI_URL = "https://s3.amazonaws.com/mesos-images/ids/latest-spark-0.6"
@@ -61,7 +60,9 @@ def parse_args():
parser.add_option("-r", "--region", default="us-east-1",
help="EC2 region zone to launch instances in")
parser.add_option("-z", "--zone", default="",
- help="Availability zone to launch instances in")
+ help="Availability zone to launch instances in, or 'all' to spread " +
+ "slaves across multiple (an additional $0.01/Gb for bandwidth" +
+ "between zones applies)")
parser.add_option("-a", "--ami", default="latest",
help="Amazon Machine Image ID to use, or 'latest' to use latest " +
"available AMI (default: latest)")
@@ -97,14 +98,20 @@ def parse_args():
if opts.cluster_type not in ["mesos", "standalone"] and action == "launch":
print >> stderr, ("ERROR: Invalid cluster type: " + opts.cluster_type)
sys.exit(1)
- if os.getenv('AWS_ACCESS_KEY_ID') == None:
- print >> stderr, ("ERROR: The environment variable AWS_ACCESS_KEY_ID " +
- "must be set")
- sys.exit(1)
- if os.getenv('AWS_SECRET_ACCESS_KEY') == None:
- print >> stderr, ("ERROR: The environment variable AWS_SECRET_ACCESS_KEY " +
- "must be set")
- sys.exit(1)
+
+ # Boto config check
+ # http://boto.cloudhackers.com/en/latest/boto_config_tut.html
+ home_dir = os.getenv('HOME')
+ if home_dir == None or not os.path.isfile(home_dir + '/.boto'):
+ if not os.path.isfile('/etc/boto.cfg'):
+ if os.getenv('AWS_ACCESS_KEY_ID') == None:
+ print >> stderr, ("ERROR: The environment variable AWS_ACCESS_KEY_ID " +
+ "must be set")
+ sys.exit(1)
+ if os.getenv('AWS_SECRET_ACCESS_KEY') == None:
+ print >> stderr, ("ERROR: The environment variable AWS_SECRET_ACCESS_KEY " +
+ "must be set")
+ sys.exit(1)
return (opts, action, cluster_name)
@@ -180,16 +187,12 @@ def launch_cluster(conn, opts, cluster_name):
zoo_group.authorize('tcp', 3888, 3888, '0.0.0.0/0')
# Check if instances are already running in our groups
- print "Checking for running cluster..."
- reservations = conn.get_all_instances()
- for res in reservations:
- group_names = [g.id for g in res.groups]
- if master_group.name in group_names or slave_group.name in group_names or zoo_group.name in group_names:
- active = [i for i in res.instances if is_active(i)]
- if len(active) > 0:
- print >> stderr, ("ERROR: There are already instances running in " +
- "group %s, %s or %s" % (master_group.name, slave_group.name, zoo_group.name))
- sys.exit(1)
+ active_nodes = get_existing_cluster(conn, opts, cluster_name,
+ die_on_error=False)
+ if any(active_nodes):
+ print >> stderr, ("ERROR: There are already instances running in " +
+ "group %s, %s or %s" % (master_group.name, slave_group.name, zoo_group.name))
+ sys.exit(1)
# Figure out the latest AMI from our static URL
if opts.ami == "latest":
@@ -221,55 +224,83 @@ def launch_cluster(conn, opts, cluster_name):
# Launch spot instances with the requested price
print ("Requesting %d slaves as spot instances with price $%.3f" %
(opts.slaves, opts.spot_price))
- slave_reqs = conn.request_spot_instances(
- price = opts.spot_price,
- image_id = opts.ami,
- launch_group = "launch-group-%s" % cluster_name,
- placement = opts.zone,
- count = opts.slaves,
- key_name = opts.key_pair,
- security_groups = [slave_group],
- instance_type = opts.instance_type,
- block_device_map = block_map)
- my_req_ids = [req.id for req in slave_reqs]
+ zones = get_zones(conn, opts)
+ num_zones = len(zones)
+ i = 0
+ my_req_ids = []
+ for zone in zones:
+ num_slaves_this_zone = get_partition(opts.slaves, num_zones, i)
+ slave_reqs = conn.request_spot_instances(
+ price = opts.spot_price,
+ image_id = opts.ami,
+ launch_group = "launch-group-%s" % cluster_name,
+ placement = zone,
+ count = num_slaves_this_zone,
+ key_name = opts.key_pair,
+ security_groups = [slave_group],
+ instance_type = opts.instance_type,
+ block_device_map = block_map)
+ my_req_ids += [req.id for req in slave_reqs]
+ i += 1
+
print "Waiting for spot instances to be granted..."
- while True:
- time.sleep(10)
- reqs = conn.get_all_spot_instance_requests()
- id_to_req = {}
- for r in reqs:
- id_to_req[r.id] = r
- active = 0
- instance_ids = []
- for i in my_req_ids:
- if id_to_req[i].state == "active":
- active += 1
- instance_ids.append(id_to_req[i].instance_id)
- if active == opts.slaves:
- print "All %d slaves granted" % opts.slaves
- reservations = conn.get_all_instances(instance_ids)
- slave_nodes = []
- for r in reservations:
- slave_nodes += r.instances
- break
- else:
- print "%d of %d slaves granted, waiting longer" % (active, opts.slaves)
+ try:
+ while True:
+ time.sleep(10)
+ reqs = conn.get_all_spot_instance_requests()
+ id_to_req = {}
+ for r in reqs:
+ id_to_req[r.id] = r
+ active_instance_ids = []
+ for i in my_req_ids:
+ if i in id_to_req and id_to_req[i].state == "active":
+ active_instance_ids.append(id_to_req[i].instance_id)
+ if len(active_instance_ids) == opts.slaves:
+ print "All %d slaves granted" % opts.slaves
+ reservations = conn.get_all_instances(active_instance_ids)
+ slave_nodes = []
+ for r in reservations:
+ slave_nodes += r.instances
+ break
+ else:
+ print "%d of %d slaves granted, waiting longer" % (
+ len(active_instance_ids), opts.slaves)
+ except:
+ print "Canceling spot instance requests"
+ conn.cancel_spot_instance_requests(my_req_ids)
+ # Log a warning if any of these requests actually launched instances:
+ (master_nodes, slave_nodes, zoo_nodes) = get_existing_cluster(
+ conn, opts, cluster_name, die_on_error=False)
+ running = len(master_nodes) + len(slave_nodes) + len(zoo_nodes)
+ if running:
+ print >> stderr, ("WARNING: %d instances are still running" % running)
+ sys.exit(0)
else:
# Launch non-spot instances
- slave_res = image.run(key_name = opts.key_pair,
- security_groups = [slave_group],
- instance_type = opts.instance_type,
- placement = opts.zone,
- min_count = opts.slaves,
- max_count = opts.slaves,
- block_device_map = block_map)
- slave_nodes = slave_res.instances
- print "Launched slaves, regid = " + slave_res.id
+ zones = get_zones(conn, opts)
+ num_zones = len(zones)
+ i = 0
+ slave_nodes = []
+ for zone in zones:
+ num_slaves_this_zone = get_partition(opts.slaves, num_zones, i)
+ slave_res = image.run(key_name = opts.key_pair,
+ security_groups = [slave_group],
+ instance_type = opts.instance_type,
+ placement = zone,
+ min_count = num_slaves_this_zone,
+ max_count = num_slaves_this_zone,
+ block_device_map = block_map)
+ slave_nodes += slave_res.instances
+ print "Launched %d slaves in %s, regid = %s" % (num_slaves_this_zone,
+ zone, slave_res.id)
+ i += 1
# Launch masters
master_type = opts.master_instance_type
if master_type == "":
master_type = opts.instance_type
+ if opts.zone == 'all':
+ opts.zone = random.choice(conn.get_all_zones()).name
master_res = image.run(key_name = opts.key_pair,
security_groups = [master_group],
instance_type = master_type,
@@ -278,7 +309,7 @@ def launch_cluster(conn, opts, cluster_name):
max_count = 1,
block_device_map = block_map)
master_nodes = master_res.instances
- print "Launched master, regid = " + master_res.id
+ print "Launched master in %s, regid = %s" % (zone, master_res.id)
zoo_nodes = []
@@ -468,9 +499,30 @@ def ssh(host, opts, command):
(opts.identity_file, opts.user, host, command), shell=True)
+# Gets a list of zones to launch instances in
+def get_zones(conn, opts):
+ if opts.zone == 'all':
+ zones = [z.name for z in conn.get_all_zones()]
+ else:
+ zones = [opts.zone]
+ return zones
+
+
+# Gets the number of items in a partition
+def get_partition(total, num_partitions, current_partitions):
+ num_slaves_this_zone = total / num_partitions
+ if (total % num_partitions) - current_partitions > 0:
+ num_slaves_this_zone += 1
+ return num_slaves_this_zone
+
+
def main():
(opts, action, cluster_name) = parse_args()
- conn = boto.ec2.connect_to_region(opts.region)
+ try:
+ conn = ec2.connect_to_region(opts.region)
+ except Exception as e:
+ print >> stderr, (e)
+ sys.exit(1)
# Select an AZ at random if it was not specified.
if opts.zone == "":
@@ -503,6 +555,20 @@ def main():
print "Terminating zoo..."
for inst in zoo_nodes:
inst.terminate()
+ # Delete security groups as well
+ group_names = [cluster_name + "-master", cluster_name + "-slaves", cluster_name + "-zoo"]
+ groups = conn.get_all_security_groups()
+ for group in groups:
+ if group.name in group_names:
+ print "Deleting security group " + group.name
+ # Delete individual rules before deleting group to remove dependencies
+ for rule in group.rules:
+ for grant in rule.grants:
+ group.revoke(ip_protocol=rule.ip_protocol,
+ from_port=rule.from_port,
+ to_port=rule.to_port,
+ src_group=grant)
+ conn.delete_security_group(group.name)
elif action == "login":
(master_nodes, slave_nodes, zoo_nodes) = get_existing_cluster(