aboutsummaryrefslogtreecommitdiff
path: root/ec2
diff options
context:
space:
mode:
authorXiangrui Meng <meng@databricks.com>2014-11-25 16:07:09 -0800
committerAndrew Or <andrew@databricks.com>2014-11-25 16:07:09 -0800
commit7eba0fbe456c451122d7a2353ff0beca00f15223 (patch)
treeca88df11d76b55975c5d5ab5e772bcf2ff3120bc /ec2
parent9bdf5da59036c0b052df756fc4a28d64677072e7 (diff)
downloadspark-7eba0fbe456c451122d7a2353ff0beca00f15223.tar.gz
spark-7eba0fbe456c451122d7a2353ff0beca00f15223.tar.bz2
spark-7eba0fbe456c451122d7a2353ff0beca00f15223.zip
[Spark-4509] Revert EC2 tag-based cluster membership patch
This PR reverts changes related to tag-based cluster membership. As discussed in SPARK-3332, we didn't figure out a safe strategy to use tags to determine cluster membership, because tagging is not atomic. The following changes are reverted: SPARK-2333: 94053a7b766788bb62e2dbbf352ccbcc75f71fc0 SPARK-3213: 7faf755ae4f0cf510048e432340260a6e609066d SPARK-3608: 78d4220fa0bf2f9ee663e34bbf3544a5313b02f0. I tested launch, login, and destroy. It is easy to check the diff by comparing it to Josh's patch for branch-1.1: https://github.com/apache/spark/pull/2225/files JoshRosen I sent the PR to master. It might be easier for us to keep master and branch-1.2 the same at this time. We can always re-apply the patch once we figure out a stable solution. Author: Xiangrui Meng <meng@databricks.com> Closes #3453 from mengxr/SPARK-4509 and squashes the following commits: f0b708b [Xiangrui Meng] revert 94053a7b766788bb62e2dbbf352ccbcc75f71fc0 4298ea5 [Xiangrui Meng] revert 7faf755ae4f0cf510048e432340260a6e609066d 35963a1 [Xiangrui Meng] Revert "SPARK-3608 Break if the instance tag naming succeeds"
Diffstat (limited to 'ec2')
-rwxr-xr-xec2/spark_ec2.py83
1 files changed, 22 insertions, 61 deletions
diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py
index a5396c2375..a4ab844a56 100755
--- a/ec2/spark_ec2.py
+++ b/ec2/spark_ec2.py
@@ -138,7 +138,7 @@ def parse_args():
help="The SSH user you want to connect as (default: %default)")
parser.add_option(
"--delete-groups", action="store_true", default=False,
- help="When destroying a cluster, delete the security groups that were created.")
+ help="When destroying a cluster, delete the security groups that were created")
parser.add_option(
"--use-existing-master", action="store_true", default=False,
help="Launch fresh slaves, but use an existing stopped master if possible")
@@ -153,9 +153,6 @@ def parse_args():
"--user-data", type="string", default="",
help="Path to a user-data file (most AMI's interpret this as an initialization script)")
parser.add_option(
- "--security-group-prefix", type="string", default=None,
- help="Use this prefix for the security group rather than the cluster name.")
- parser.add_option(
"--authorized-address", type="string", default="0.0.0.0/0",
help="Address to authorize on created security groups (default: %default)")
parser.add_option(
@@ -305,12 +302,8 @@ def launch_cluster(conn, opts, cluster_name):
user_data_content = user_data_file.read()
print "Setting up security groups..."
- if opts.security_group_prefix is None:
- master_group = get_or_make_group(conn, cluster_name + "-master")
- slave_group = get_or_make_group(conn, cluster_name + "-slaves")
- else:
- master_group = get_or_make_group(conn, opts.security_group_prefix + "-master")
- slave_group = get_or_make_group(conn, opts.security_group_prefix + "-slaves")
+ master_group = get_or_make_group(conn, cluster_name + "-master")
+ slave_group = get_or_make_group(conn, cluster_name + "-slaves")
authorized_address = opts.authorized_address
if master_group.rules == []: # Group was just now created
master_group.authorize(src_group=master_group)
@@ -335,11 +328,12 @@ def launch_cluster(conn, opts, cluster_name):
slave_group.authorize('tcp', 60060, 60060, authorized_address)
slave_group.authorize('tcp', 60075, 60075, authorized_address)
- # Check if instances are already running with the cluster name
+ # Check if instances are already running in our groups
existing_masters, existing_slaves = get_existing_cluster(conn, opts, cluster_name,
die_on_error=False)
if existing_slaves or (existing_masters and not opts.use_existing_master):
- print >> stderr, ("ERROR: There are already instances for name: %s " % cluster_name)
+ print >> stderr, ("ERROR: There are already instances running in " +
+ "group %s or %s" % (master_group.name, slave_group.name))
sys.exit(1)
# Figure out Spark AMI
@@ -413,13 +407,9 @@ def launch_cluster(conn, opts, cluster_name):
for r in reqs:
id_to_req[r.id] = r
active_instance_ids = []
- outstanding_request_ids = []
for i in my_req_ids:
- if i in id_to_req:
- if id_to_req[i].state == "active":
- active_instance_ids.append(id_to_req[i].instance_id)
- else:
- outstanding_request_ids.append(i)
+ if i in id_to_req and id_to_req[i].state == "active":
+ active_instance_ids.append(id_to_req[i].instance_id)
if len(active_instance_ids) == opts.slaves:
print "All %d slaves granted" % opts.slaves
reservations = conn.get_all_instances(active_instance_ids)
@@ -428,8 +418,8 @@ def launch_cluster(conn, opts, cluster_name):
slave_nodes += r.instances
break
else:
- print "%d of %d slaves granted, waiting longer for request ids including %s" % (
- len(active_instance_ids), opts.slaves, outstanding_request_ids[0:10])
+ print "%d of %d slaves granted, waiting longer" % (
+ len(active_instance_ids), opts.slaves)
except:
print "Canceling spot instance requests"
conn.cancel_spot_instance_requests(my_req_ids)
@@ -488,59 +478,34 @@ def launch_cluster(conn, opts, cluster_name):
# Give the instances descriptive names
for master in master_nodes:
- name = '{cn}-master-{iid}'.format(cn=cluster_name, iid=master.id)
- tag_instance(master, name)
-
+ master.add_tag(
+ key='Name',
+ value='{cn}-master-{iid}'.format(cn=cluster_name, iid=master.id))
for slave in slave_nodes:
- name = '{cn}-slave-{iid}'.format(cn=cluster_name, iid=slave.id)
- tag_instance(slave, name)
+ slave.add_tag(
+ key='Name',
+ value='{cn}-slave-{iid}'.format(cn=cluster_name, iid=slave.id))
# Return all the instances
return (master_nodes, slave_nodes)
-def tag_instance(instance, name):
- for i in range(0, 5):
- try:
- instance.add_tag(key='Name', value=name)
- break
- except:
- print "Failed attempt %i of 5 to tag %s" % ((i + 1), name)
- if i == 5:
- raise "Error - failed max attempts to add name tag"
- time.sleep(5)
-
# Get the EC2 instances in an existing cluster if available.
# Returns a tuple of lists of EC2 instance objects for the masters and slaves
def get_existing_cluster(conn, opts, cluster_name, die_on_error=True):
print "Searching for existing cluster " + cluster_name + "..."
- # Search all the spot instance requests, and copy any tags from the spot
- # instance request to the cluster.
- spot_instance_requests = conn.get_all_spot_instance_requests()
- for req in spot_instance_requests:
- if req.state != u'active':
- continue
- name = req.tags.get(u'Name', "")
- if name.startswith(cluster_name):
- reservations = conn.get_all_instances(instance_ids=[req.instance_id])
- for res in reservations:
- active = [i for i in res.instances if is_active(i)]
- for instance in active:
- if instance.tags.get(u'Name') is None:
- tag_instance(instance, name)
- # Now proceed to detect master and slaves instances.
reservations = conn.get_all_instances()
master_nodes = []
slave_nodes = []
for res in reservations:
active = [i for i in res.instances if is_active(i)]
for inst in active:
- name = inst.tags.get(u'Name', "")
- if name.startswith(cluster_name + "-master"):
+ group_names = [g.name for g in inst.groups]
+ if group_names == [cluster_name + "-master"]:
master_nodes.append(inst)
- elif name.startswith(cluster_name + "-slave"):
+ elif group_names == [cluster_name + "-slaves"]:
slave_nodes.append(inst)
if any((master_nodes, slave_nodes)):
print "Found %d master(s), %d slaves" % (len(master_nodes), len(slave_nodes))
@@ -548,12 +513,12 @@ def get_existing_cluster(conn, opts, cluster_name, die_on_error=True):
return (master_nodes, slave_nodes)
else:
if master_nodes == [] and slave_nodes != []:
- print >> sys.stderr, "ERROR: Could not find master in with name " + \
- cluster_name + "-master"
+ print >> sys.stderr, "ERROR: Could not find master in group " + cluster_name + "-master"
else:
print >> sys.stderr, "ERROR: Could not find any existing cluster"
sys.exit(1)
+
# Deploy configuration files and run setup scripts on a newly launched
# or started EC2 cluster.
@@ -984,11 +949,7 @@ def real_main():
# Delete security groups as well
if opts.delete_groups:
print "Deleting security groups (this will take some time)..."
- if opts.security_group_prefix is None:
- group_names = [cluster_name + "-master", cluster_name + "-slaves"]
- else:
- group_names = [opts.security_group_prefix + "-master",
- opts.security_group_prefix + "-slaves"]
+ group_names = [cluster_name + "-master", cluster_name + "-slaves"]
wait_for_cluster_state(
cluster_instances=(master_nodes + slave_nodes),
cluster_state='terminated',