aboutsummaryrefslogtreecommitdiff
path: root/ec2
diff options
context:
space:
mode:
authorVida Ha <vida@databricks.com>2014-08-19 13:35:05 -0700
committerJosh Rosen <joshrosen@apache.org>2014-08-19 13:35:05 -0700
commit94053a7b766788bb62e2dbbf352ccbcc75f71fc0 (patch)
treec18c3cee807527cee9e8d3985ca21dbf748cbd90 /ec2
parent31f0b071efd0b63eb9d6a6a131e5c4fa28237583 (diff)
downloadspark-94053a7b766788bb62e2dbbf352ccbcc75f71fc0.tar.gz
spark-94053a7b766788bb62e2dbbf352ccbcc75f71fc0.tar.bz2
spark-94053a7b766788bb62e2dbbf352ccbcc75f71fc0.zip
SPARK-2333 - spark_ec2 script should allow option for existing security group
- Uses the name tag to identify machines in a cluster. - Allows overriding the security group name so it doesn't need to coincide with the cluster name. - Outputs the request id's of up to 10 pending spot instance requests. Author: Vida Ha <vida@databricks.com> Closes #1899 from vidaha/vida/ec2-reuse-security-group and squashes the following commits: c80d5c3 [Vida Ha] wrap retries in a try catch block b2989d5 [Vida Ha] SPARK-2333: spark_ec2 script should allow option for existing security group
Diffstat (limited to 'ec2')
-rwxr-xr-xec2/spark_ec2.py71
1 files changed, 49 insertions, 22 deletions
diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py
index 0c2f85a386..3a8c816cff 100755
--- a/ec2/spark_ec2.py
+++ b/ec2/spark_ec2.py
@@ -124,7 +124,7 @@ def parse_args():
help="The SSH user you want to connect as (default: root)")
parser.add_option(
"--delete-groups", action="store_true", default=False,
- help="When destroying a cluster, delete the security groups that were created")
+ help="When destroying a cluster, delete the security groups that were created.")
parser.add_option(
"--use-existing-master", action="store_true", default=False,
help="Launch fresh slaves, but use an existing stopped master if possible")
@@ -138,7 +138,9 @@ def parse_args():
parser.add_option(
"--user-data", type="string", default="",
help="Path to a user-data file (most AMI's interpret this as an initialization script)")
-
+ parser.add_option(
+ "--security-group-prefix", type="string", default=None,
+ help="Use this prefix for the security group rather than the cluster name.")
(opts, args) = parser.parse_args()
if len(args) != 2:
@@ -285,8 +287,12 @@ def launch_cluster(conn, opts, cluster_name):
user_data_content = user_data_file.read()
print "Setting up security groups..."
- master_group = get_or_make_group(conn, cluster_name + "-master")
- slave_group = get_or_make_group(conn, cluster_name + "-slaves")
+ if opts.security_group_prefix is None:
+ master_group = get_or_make_group(conn, cluster_name + "-master")
+ slave_group = get_or_make_group(conn, cluster_name + "-slaves")
+ else:
+ master_group = get_or_make_group(conn, opts.security_group_prefix + "-master")
+ slave_group = get_or_make_group(conn, opts.security_group_prefix + "-slaves")
if master_group.rules == []: # Group was just now created
master_group.authorize(src_group=master_group)
master_group.authorize(src_group=slave_group)
@@ -310,12 +316,11 @@ def launch_cluster(conn, opts, cluster_name):
slave_group.authorize('tcp', 60060, 60060, '0.0.0.0/0')
slave_group.authorize('tcp', 60075, 60075, '0.0.0.0/0')
- # Check if instances are already running in our groups
+ # Check if instances are already running with the cluster name
existing_masters, existing_slaves = get_existing_cluster(conn, opts, cluster_name,
die_on_error=False)
if existing_slaves or (existing_masters and not opts.use_existing_master):
- print >> stderr, ("ERROR: There are already instances running in " +
- "group %s or %s" % (master_group.name, slave_group.name))
+ print >> stderr, ("ERROR: There are already instances for name: %s " % cluster_name)
sys.exit(1)
# Figure out Spark AMI
@@ -371,9 +376,13 @@ def launch_cluster(conn, opts, cluster_name):
for r in reqs:
id_to_req[r.id] = r
active_instance_ids = []
+ outstanding_request_ids = []
for i in my_req_ids:
- if i in id_to_req and id_to_req[i].state == "active":
- active_instance_ids.append(id_to_req[i].instance_id)
+ if i in id_to_req:
+ if id_to_req[i].state == "active":
+ active_instance_ids.append(id_to_req[i].instance_id)
+ else:
+ outstanding_request_ids.append(i)
if len(active_instance_ids) == opts.slaves:
print "All %d slaves granted" % opts.slaves
reservations = conn.get_all_instances(active_instance_ids)
@@ -382,8 +391,8 @@ def launch_cluster(conn, opts, cluster_name):
slave_nodes += r.instances
break
else:
- print "%d of %d slaves granted, waiting longer" % (
- len(active_instance_ids), opts.slaves)
+ print "%d of %d slaves granted, waiting longer for request ids including %s" % (
+ len(active_instance_ids), opts.slaves, outstanding_request_ids[0:10])
except:
print "Canceling spot instance requests"
conn.cancel_spot_instance_requests(my_req_ids)
@@ -440,14 +449,29 @@ def launch_cluster(conn, opts, cluster_name):
print "Launched master in %s, regid = %s" % (zone, master_res.id)
# Give the instances descriptive names
+ # TODO: Add retry logic for tagging with name since it's used to identify a cluster.
for master in master_nodes:
- master.add_tag(
- key='Name',
- value='{cn}-master-{iid}'.format(cn=cluster_name, iid=master.id))
+ name = '{cn}-master-{iid}'.format(cn=cluster_name, iid=master.id)
+ for i in range(0, 5):
+ try:
+ master.add_tag(key='Name', value=name)
+ except:
+ print "Failed attempt %i of 5 to tag %s" % ((i + 1), name)
+ if (i == 5):
+ raise "Error - failed max attempts to add name tag"
+ time.sleep(5)
+
+
for slave in slave_nodes:
- slave.add_tag(
- key='Name',
- value='{cn}-slave-{iid}'.format(cn=cluster_name, iid=slave.id))
+ name = '{cn}-slave-{iid}'.format(cn=cluster_name, iid=slave.id)
+ for i in range(0, 5):
+ try:
+ slave.add_tag(key='Name', value=name)
+ except:
+ print "Failed attempt %i of 5 to tag %s" % ((i + 1), name)
+ if (i == 5):
+ raise "Error - failed max attempts to add name tag"
+ time.sleep(5)
# Return all the instances
return (master_nodes, slave_nodes)
@@ -463,10 +487,10 @@ def get_existing_cluster(conn, opts, cluster_name, die_on_error=True):
for res in reservations:
active = [i for i in res.instances if is_active(i)]
for inst in active:
- group_names = [g.name for g in inst.groups]
- if group_names == [cluster_name + "-master"]:
+ name = inst.tags.get(u'Name', "")
+ if name.startswith(cluster_name + "-master"):
master_nodes.append(inst)
- elif group_names == [cluster_name + "-slaves"]:
+ elif name.startswith(cluster_name + "-slave"):
slave_nodes.append(inst)
if any((master_nodes, slave_nodes)):
print ("Found %d master(s), %d slaves" % (len(master_nodes), len(slave_nodes)))
@@ -474,7 +498,7 @@ def get_existing_cluster(conn, opts, cluster_name, die_on_error=True):
return (master_nodes, slave_nodes)
else:
if master_nodes == [] and slave_nodes != []:
- print >> sys.stderr, "ERROR: Could not find master in group " + cluster_name + "-master"
+ print >> sys.stderr, "ERROR: Could not find master in with name " + cluster_name + "-master"
else:
print >> sys.stderr, "ERROR: Could not find any existing cluster"
sys.exit(1)
@@ -816,7 +840,10 @@ def real_main():
# Delete security groups as well
if opts.delete_groups:
print "Deleting security groups (this will take some time)..."
- group_names = [cluster_name + "-master", cluster_name + "-slaves"]
+ if opts.security_group_prefix is None:
+ group_names = [cluster_name + "-master", cluster_name + "-slaves"]
+ else:
+ group_names = [opts.security_group_prefix + "-master", opts.security_group_prefix + "-slaves"]
attempt = 1
while attempt <= 3: