diff options
author | Vida Ha <vida@databricks.com> | 2014-08-27 14:26:06 -0700 |
---|---|---|
committer | Josh Rosen <joshrosen@apache.org> | 2014-08-27 14:26:06 -0700 |
commit | 7faf755ae4f0cf510048e432340260a6e609066d (patch) | |
tree | 9e7a332f6b19c4fe30981328aa7c8b2aed4ec93c | |
parent | 4fa2fda88fc7beebb579ba808e400113b512533b (diff) | |
download | spark-7faf755ae4f0cf510048e432340260a6e609066d.tar.gz spark-7faf755ae4f0cf510048e432340260a6e609066d.tar.bz2 spark-7faf755ae4f0cf510048e432340260a6e609066d.zip |
Spark-3213 Fixes issue with spark-ec2 not detecting slaves created with "Launch More like this"
... copy the spark_cluster_tag from a spot instance requests over to the instances.
Author: Vida Ha <vida@databricks.com>
Closes #2163 from vidaha/vida/spark-3213 and squashes the following commits:
5070a70 [Vida Ha] Spark-3214 Fix issue with spark-ec2 not detecting slaves created with 'Launch More Like This' and using Spot Requests
-rwxr-xr-x | ec2/spark_ec2.py | 45 |
1 files changed, 25 insertions, 20 deletions
diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py index ddd72a0f86..ae4c488f83 100755 --- a/ec2/spark_ec2.py +++ b/ec2/spark_ec2.py @@ -40,7 +40,6 @@ from boto import ec2 # A URL prefix from which to fetch AMI information AMI_PREFIX = "https://raw.github.com/mesos/spark-ec2/v2/ami-list" - class UsageError(Exception): pass @@ -464,38 +463,45 @@ def launch_cluster(conn, opts, cluster_name): print "Launched master in %s, regid = %s" % (zone, master_res.id) # Give the instances descriptive names - # TODO: Add retry logic for tagging with name since it's used to identify a cluster. for master in master_nodes: name = '{cn}-master-{iid}'.format(cn=cluster_name, iid=master.id) - for i in range(0, 5): - try: - master.add_tag(key='Name', value=name) - except: - print "Failed attempt %i of 5 to tag %s" % ((i + 1), name) - if (i == 5): - raise "Error - failed max attempts to add name tag" - time.sleep(5) - + tag_instance(master, name) for slave in slave_nodes: name = '{cn}-slave-{iid}'.format(cn=cluster_name, iid=slave.id) - for i in range(0, 5): - try: - slave.add_tag(key='Name', value=name) - except: - print "Failed attempt %i of 5 to tag %s" % ((i + 1), name) - if (i == 5): - raise "Error - failed max attempts to add name tag" - time.sleep(5) + tag_instance(slave, name) # Return all the instances return (master_nodes, slave_nodes) +def tag_instance(instance, name): + for i in range(0, 5): + try: + instance.add_tag(key='Name', value=name) + except: + print "Failed attempt %i of 5 to tag %s" % ((i + 1), name) + if (i == 5): + raise "Error - failed max attempts to add name tag" + time.sleep(5) # Get the EC2 instances in an existing cluster if available. # Returns a tuple of lists of EC2 instance objects for the masters and slaves def get_existing_cluster(conn, opts, cluster_name, die_on_error=True): print "Searching for existing cluster " + cluster_name + "..." + # Search all the spot instance requests, and copy any tags from the spot instance request to the cluster. + spot_instance_requests = conn.get_all_spot_instance_requests() + for req in spot_instance_requests: + if req.state != u'active': + continue + name = req.tags.get(u'Name', "") + if name.startswith(cluster_name): + reservations = conn.get_all_instances(instance_ids=[req.instance_id]) + for res in reservations: + active = [i for i in res.instances if is_active(i)] + for instance in active: + if (instance.tags.get(u'Name') == None): + tag_instance(instance, name) + # Now proceed to detect master and slaves instances. reservations = conn.get_all_instances() master_nodes = [] slave_nodes = [] @@ -518,7 +524,6 @@ def get_existing_cluster(conn, opts, cluster_name, die_on_error=True): print >> sys.stderr, "ERROR: Could not find any existing cluster" sys.exit(1) - # Deploy configuration files and run setup scripts on a newly launched # or started EC2 cluster. def setup_cluster(conn, master_nodes, slave_nodes, opts, deploy_ssh_key): |