aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--docs/ec2-scripts.md14
-rwxr-xr-xec2/spark_ec2.py71
2 files changed, 57 insertions, 28 deletions
diff --git a/docs/ec2-scripts.md b/docs/ec2-scripts.md
index 156a727026..f5ac6d894e 100644
--- a/docs/ec2-scripts.md
+++ b/docs/ec2-scripts.md
@@ -12,14 +12,16 @@ on the [Amazon Web Services site](http://aws.amazon.com/).
`spark-ec2` is designed to manage multiple named clusters. You can
launch a new cluster (telling the script its size and giving it a name),
-shutdown an existing cluster, or log into a cluster. Each cluster is
-identified by placing its machines into EC2 security groups whose names
-are derived from the name of the cluster. For example, a cluster named
+shutdown an existing cluster, or log into a cluster. Each cluster
+launches a set of instances, which are tagged with the cluster name,
+and placed into EC2 security groups. If you don't specify a security
+group, the `spark-ec2` script will create security groups based on the
+cluster name you request. For example, a cluster named
`test` will contain a master node in a security group called
`test-master`, and a number of slave nodes in a security group called
-`test-slaves`. The `spark-ec2` script will create these security groups
-for you based on the cluster name you request. You can also use them to
-identify machines belonging to each cluster in the Amazon EC2 Console.
+`test-slaves`. You can also specify a security group prefix to be used
+in place of the cluster name. Machines in a cluster can be identified
+by looking for the "Name" tag of the instance in the Amazon EC2 Console.
# Before You Start
diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py
index 0c2f85a386..3a8c816cff 100755
--- a/ec2/spark_ec2.py
+++ b/ec2/spark_ec2.py
@@ -124,7 +124,7 @@ def parse_args():
help="The SSH user you want to connect as (default: root)")
parser.add_option(
"--delete-groups", action="store_true", default=False,
- help="When destroying a cluster, delete the security groups that were created")
+ help="When destroying a cluster, delete the security groups that were created.")
parser.add_option(
"--use-existing-master", action="store_true", default=False,
help="Launch fresh slaves, but use an existing stopped master if possible")
@@ -138,7 +138,9 @@ def parse_args():
parser.add_option(
"--user-data", type="string", default="",
help="Path to a user-data file (most AMI's interpret this as an initialization script)")
-
+ parser.add_option(
+ "--security-group-prefix", type="string", default=None,
+ help="Use this prefix for the security group rather than the cluster name.")
(opts, args) = parser.parse_args()
if len(args) != 2:
@@ -285,8 +287,12 @@ def launch_cluster(conn, opts, cluster_name):
user_data_content = user_data_file.read()
print "Setting up security groups..."
- master_group = get_or_make_group(conn, cluster_name + "-master")
- slave_group = get_or_make_group(conn, cluster_name + "-slaves")
+ if opts.security_group_prefix is None:
+ master_group = get_or_make_group(conn, cluster_name + "-master")
+ slave_group = get_or_make_group(conn, cluster_name + "-slaves")
+ else:
+ master_group = get_or_make_group(conn, opts.security_group_prefix + "-master")
+ slave_group = get_or_make_group(conn, opts.security_group_prefix + "-slaves")
if master_group.rules == []: # Group was just now created
master_group.authorize(src_group=master_group)
master_group.authorize(src_group=slave_group)
@@ -310,12 +316,11 @@ def launch_cluster(conn, opts, cluster_name):
slave_group.authorize('tcp', 60060, 60060, '0.0.0.0/0')
slave_group.authorize('tcp', 60075, 60075, '0.0.0.0/0')
- # Check if instances are already running in our groups
+ # Check if instances are already running with the cluster name
existing_masters, existing_slaves = get_existing_cluster(conn, opts, cluster_name,
die_on_error=False)
if existing_slaves or (existing_masters and not opts.use_existing_master):
- print >> stderr, ("ERROR: There are already instances running in " +
- "group %s or %s" % (master_group.name, slave_group.name))
+ print >> stderr, ("ERROR: There are already instances for name: %s " % cluster_name)
sys.exit(1)
# Figure out Spark AMI
@@ -371,9 +376,13 @@ def launch_cluster(conn, opts, cluster_name):
for r in reqs:
id_to_req[r.id] = r
active_instance_ids = []
+ outstanding_request_ids = []
for i in my_req_ids:
- if i in id_to_req and id_to_req[i].state == "active":
- active_instance_ids.append(id_to_req[i].instance_id)
+ if i in id_to_req:
+ if id_to_req[i].state == "active":
+ active_instance_ids.append(id_to_req[i].instance_id)
+ else:
+ outstanding_request_ids.append(i)
if len(active_instance_ids) == opts.slaves:
print "All %d slaves granted" % opts.slaves
reservations = conn.get_all_instances(active_instance_ids)
@@ -382,8 +391,8 @@ def launch_cluster(conn, opts, cluster_name):
slave_nodes += r.instances
break
else:
- print "%d of %d slaves granted, waiting longer" % (
- len(active_instance_ids), opts.slaves)
+ print "%d of %d slaves granted, waiting longer for request ids including %s" % (
+ len(active_instance_ids), opts.slaves, outstanding_request_ids[0:10])
except:
print "Canceling spot instance requests"
conn.cancel_spot_instance_requests(my_req_ids)
@@ -440,14 +449,29 @@ def launch_cluster(conn, opts, cluster_name):
print "Launched master in %s, regid = %s" % (zone, master_res.id)
# Give the instances descriptive names
+ # TODO: Add retry logic for tagging with name since it's used to identify a cluster.
for master in master_nodes:
- master.add_tag(
- key='Name',
- value='{cn}-master-{iid}'.format(cn=cluster_name, iid=master.id))
+ name = '{cn}-master-{iid}'.format(cn=cluster_name, iid=master.id)
+ for i in range(0, 5):
+ try:
+ master.add_tag(key='Name', value=name)
+ except:
+ print "Failed attempt %i of 5 to tag %s" % ((i + 1), name)
+ if (i == 5):
+ raise "Error - failed max attempts to add name tag"
+ time.sleep(5)
+
+
for slave in slave_nodes:
- slave.add_tag(
- key='Name',
- value='{cn}-slave-{iid}'.format(cn=cluster_name, iid=slave.id))
+ name = '{cn}-slave-{iid}'.format(cn=cluster_name, iid=slave.id)
+ for i in range(0, 5):
+ try:
+ slave.add_tag(key='Name', value=name)
+ except:
+ print "Failed attempt %i of 5 to tag %s" % ((i + 1), name)
+ if (i == 5):
+ raise "Error - failed max attempts to add name tag"
+ time.sleep(5)
# Return all the instances
return (master_nodes, slave_nodes)
@@ -463,10 +487,10 @@ def get_existing_cluster(conn, opts, cluster_name, die_on_error=True):
for res in reservations:
active = [i for i in res.instances if is_active(i)]
for inst in active:
- group_names = [g.name for g in inst.groups]
- if group_names == [cluster_name + "-master"]:
+ name = inst.tags.get(u'Name', "")
+ if name.startswith(cluster_name + "-master"):
master_nodes.append(inst)
- elif group_names == [cluster_name + "-slaves"]:
+ elif name.startswith(cluster_name + "-slave"):
slave_nodes.append(inst)
if any((master_nodes, slave_nodes)):
print ("Found %d master(s), %d slaves" % (len(master_nodes), len(slave_nodes)))
@@ -474,7 +498,7 @@ def get_existing_cluster(conn, opts, cluster_name, die_on_error=True):
return (master_nodes, slave_nodes)
else:
if master_nodes == [] and slave_nodes != []:
- print >> sys.stderr, "ERROR: Could not find master in group " + cluster_name + "-master"
+ print >> sys.stderr, "ERROR: Could not find master in with name " + cluster_name + "-master"
else:
print >> sys.stderr, "ERROR: Could not find any existing cluster"
sys.exit(1)
@@ -816,7 +840,10 @@ def real_main():
# Delete security groups as well
if opts.delete_groups:
print "Deleting security groups (this will take some time)..."
- group_names = [cluster_name + "-master", cluster_name + "-slaves"]
+ if opts.security_group_prefix is None:
+ group_names = [cluster_name + "-master", cluster_name + "-slaves"]
+ else:
+ group_names = [opts.security_group_prefix + "-master", opts.security_group_prefix + "-slaves"]
attempt = 1
while attempt <= 3: