diff options
author | Xiangrui Meng <meng@databricks.com> | 2014-11-25 16:07:09 -0800 |
---|---|---|
committer | Andrew Or <andrew@databricks.com> | 2014-11-25 16:07:09 -0800 |
commit | 7eba0fbe456c451122d7a2353ff0beca00f15223 (patch) | |
tree | ca88df11d76b55975c5d5ab5e772bcf2ff3120bc /ec2 | |
parent | 9bdf5da59036c0b052df756fc4a28d64677072e7 (diff) | |
download | spark-7eba0fbe456c451122d7a2353ff0beca00f15223.tar.gz spark-7eba0fbe456c451122d7a2353ff0beca00f15223.tar.bz2 spark-7eba0fbe456c451122d7a2353ff0beca00f15223.zip |
[Spark-4509] Revert EC2 tag-based cluster membership patch
This PR reverts changes related to tag-based cluster membership. As discussed in SPARK-3332, we didn't figure out a safe strategy to use tags to determine cluster membership, because tagging is not atomic. The following changes are reverted:
SPARK-2333: 94053a7b766788bb62e2dbbf352ccbcc75f71fc0
SPARK-3213: 7faf755ae4f0cf510048e432340260a6e609066d
SPARK-3608: 78d4220fa0bf2f9ee663e34bbf3544a5313b02f0.
I tested launch, login, and destroy. It is easy to check the diff by comparing it to Josh's patch for branch-1.1:
https://github.com/apache/spark/pull/2225/files
JoshRosen I sent the PR to master. It might be easier for us to keep master and branch-1.2 the same at this time. We can always re-apply the patch once we figure out a stable solution.
Author: Xiangrui Meng <meng@databricks.com>
Closes #3453 from mengxr/SPARK-4509 and squashes the following commits:
f0b708b [Xiangrui Meng] revert 94053a7b766788bb62e2dbbf352ccbcc75f71fc0
4298ea5 [Xiangrui Meng] revert 7faf755ae4f0cf510048e432340260a6e609066d
35963a1 [Xiangrui Meng] Revert "SPARK-3608 Break if the instance tag naming succeeds"
Diffstat (limited to 'ec2')
-rwxr-xr-x | ec2/spark_ec2.py | 83 |
1 files changed, 22 insertions, 61 deletions
diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py index a5396c2375..a4ab844a56 100755 --- a/ec2/spark_ec2.py +++ b/ec2/spark_ec2.py @@ -138,7 +138,7 @@ def parse_args(): help="The SSH user you want to connect as (default: %default)") parser.add_option( "--delete-groups", action="store_true", default=False, - help="When destroying a cluster, delete the security groups that were created.") + help="When destroying a cluster, delete the security groups that were created") parser.add_option( "--use-existing-master", action="store_true", default=False, help="Launch fresh slaves, but use an existing stopped master if possible") @@ -153,9 +153,6 @@ def parse_args(): "--user-data", type="string", default="", help="Path to a user-data file (most AMI's interpret this as an initialization script)") parser.add_option( - "--security-group-prefix", type="string", default=None, - help="Use this prefix for the security group rather than the cluster name.") - parser.add_option( "--authorized-address", type="string", default="0.0.0.0/0", help="Address to authorize on created security groups (default: %default)") parser.add_option( @@ -305,12 +302,8 @@ def launch_cluster(conn, opts, cluster_name): user_data_content = user_data_file.read() print "Setting up security groups..." - if opts.security_group_prefix is None: - master_group = get_or_make_group(conn, cluster_name + "-master") - slave_group = get_or_make_group(conn, cluster_name + "-slaves") - else: - master_group = get_or_make_group(conn, opts.security_group_prefix + "-master") - slave_group = get_or_make_group(conn, opts.security_group_prefix + "-slaves") + master_group = get_or_make_group(conn, cluster_name + "-master") + slave_group = get_or_make_group(conn, cluster_name + "-slaves") authorized_address = opts.authorized_address if master_group.rules == []: # Group was just now created master_group.authorize(src_group=master_group) @@ -335,11 +328,12 @@ def launch_cluster(conn, opts, cluster_name): slave_group.authorize('tcp', 60060, 60060, authorized_address) slave_group.authorize('tcp', 60075, 60075, authorized_address) - # Check if instances are already running with the cluster name + # Check if instances are already running in our groups existing_masters, existing_slaves = get_existing_cluster(conn, opts, cluster_name, die_on_error=False) if existing_slaves or (existing_masters and not opts.use_existing_master): - print >> stderr, ("ERROR: There are already instances for name: %s " % cluster_name) + print >> stderr, ("ERROR: There are already instances running in " + + "group %s or %s" % (master_group.name, slave_group.name)) sys.exit(1) # Figure out Spark AMI @@ -413,13 +407,9 @@ def launch_cluster(conn, opts, cluster_name): for r in reqs: id_to_req[r.id] = r active_instance_ids = [] - outstanding_request_ids = [] for i in my_req_ids: - if i in id_to_req: - if id_to_req[i].state == "active": - active_instance_ids.append(id_to_req[i].instance_id) - else: - outstanding_request_ids.append(i) + if i in id_to_req and id_to_req[i].state == "active": + active_instance_ids.append(id_to_req[i].instance_id) if len(active_instance_ids) == opts.slaves: print "All %d slaves granted" % opts.slaves reservations = conn.get_all_instances(active_instance_ids) @@ -428,8 +418,8 @@ def launch_cluster(conn, opts, cluster_name): slave_nodes += r.instances break else: - print "%d of %d slaves granted, waiting longer for request ids including %s" % ( - len(active_instance_ids), opts.slaves, outstanding_request_ids[0:10]) + print "%d of %d slaves granted, waiting longer" % ( + len(active_instance_ids), opts.slaves) except: print "Canceling spot instance requests" conn.cancel_spot_instance_requests(my_req_ids) @@ -488,59 +478,34 @@ def launch_cluster(conn, opts, cluster_name): # Give the instances descriptive names for master in master_nodes: - name = '{cn}-master-{iid}'.format(cn=cluster_name, iid=master.id) - tag_instance(master, name) - + master.add_tag( + key='Name', + value='{cn}-master-{iid}'.format(cn=cluster_name, iid=master.id)) for slave in slave_nodes: - name = '{cn}-slave-{iid}'.format(cn=cluster_name, iid=slave.id) - tag_instance(slave, name) + slave.add_tag( + key='Name', + value='{cn}-slave-{iid}'.format(cn=cluster_name, iid=slave.id)) # Return all the instances return (master_nodes, slave_nodes) -def tag_instance(instance, name): - for i in range(0, 5): - try: - instance.add_tag(key='Name', value=name) - break - except: - print "Failed attempt %i of 5 to tag %s" % ((i + 1), name) - if i == 5: - raise "Error - failed max attempts to add name tag" - time.sleep(5) - # Get the EC2 instances in an existing cluster if available. # Returns a tuple of lists of EC2 instance objects for the masters and slaves def get_existing_cluster(conn, opts, cluster_name, die_on_error=True): print "Searching for existing cluster " + cluster_name + "..." - # Search all the spot instance requests, and copy any tags from the spot - # instance request to the cluster. - spot_instance_requests = conn.get_all_spot_instance_requests() - for req in spot_instance_requests: - if req.state != u'active': - continue - name = req.tags.get(u'Name', "") - if name.startswith(cluster_name): - reservations = conn.get_all_instances(instance_ids=[req.instance_id]) - for res in reservations: - active = [i for i in res.instances if is_active(i)] - for instance in active: - if instance.tags.get(u'Name') is None: - tag_instance(instance, name) - # Now proceed to detect master and slaves instances. reservations = conn.get_all_instances() master_nodes = [] slave_nodes = [] for res in reservations: active = [i for i in res.instances if is_active(i)] for inst in active: - name = inst.tags.get(u'Name', "") - if name.startswith(cluster_name + "-master"): + group_names = [g.name for g in inst.groups] + if group_names == [cluster_name + "-master"]: master_nodes.append(inst) - elif name.startswith(cluster_name + "-slave"): + elif group_names == [cluster_name + "-slaves"]: slave_nodes.append(inst) if any((master_nodes, slave_nodes)): print "Found %d master(s), %d slaves" % (len(master_nodes), len(slave_nodes)) @@ -548,12 +513,12 @@ def get_existing_cluster(conn, opts, cluster_name, die_on_error=True): return (master_nodes, slave_nodes) else: if master_nodes == [] and slave_nodes != []: - print >> sys.stderr, "ERROR: Could not find master in with name " + \ - cluster_name + "-master" + print >> sys.stderr, "ERROR: Could not find master in group " + cluster_name + "-master" else: print >> sys.stderr, "ERROR: Could not find any existing cluster" sys.exit(1) + # Deploy configuration files and run setup scripts on a newly launched # or started EC2 cluster. @@ -984,11 +949,7 @@ def real_main(): # Delete security groups as well if opts.delete_groups: print "Deleting security groups (this will take some time)..." - if opts.security_group_prefix is None: - group_names = [cluster_name + "-master", cluster_name + "-slaves"] - else: - group_names = [opts.security_group_prefix + "-master", - opts.security_group_prefix + "-slaves"] + group_names = [cluster_name + "-master", cluster_name + "-slaves"] wait_for_cluster_state( cluster_instances=(master_nodes + slave_nodes), cluster_state='terminated', |