aboutsummaryrefslogtreecommitdiff
path: root/ec2
diff options
context:
space:
mode:
authorNicholas Chammas <nicholas.chammas@gmail.com>2014-10-07 16:54:32 -0700
committerJosh Rosen <joshrosen@apache.org>2014-10-07 16:54:32 -0700
commit5912ca67140eed5dea66745aa3af4febdbd80781 (patch)
treec01f62760a782ac3d26f4a1eda118d344cfe5a95 /ec2
parentb32bb72e812731d28bf05f2145314c63806f3335 (diff)
downloadspark-5912ca67140eed5dea66745aa3af4febdbd80781.tar.gz
spark-5912ca67140eed5dea66745aa3af4febdbd80781.tar.bz2
spark-5912ca67140eed5dea66745aa3af4febdbd80781.zip
[SPARK-3398] [EC2] Have spark-ec2 intelligently wait for specific cluster states
Instead of waiting arbitrary amounts of time for the cluster to reach a specific state, this patch lets `spark-ec2` explicitly wait for a cluster to reach a desired state. This is useful in a couple of situations: * The cluster is launching and you want to wait until SSH is available before installing stuff. * The cluster is being terminated and you want to wait until all the instances are terminated before trying to delete security groups. This patch removes the need for the `--wait` option and removes some of the time-based retry logic that was being used. Author: Nicholas Chammas <nicholas.chammas@gmail.com> Closes #2339 from nchammas/spark-ec2-wait-properly and squashes the following commits: 43a69f0 [Nicholas Chammas] short-circuit SSH check; linear backoff 9a9e035 [Nicholas Chammas] remove extraneous comment 26c5ed0 [Nicholas Chammas] replace print with write() bb67c06 [Nicholas Chammas] deprecate wait option; remove dead code 7969265 [Nicholas Chammas] fix long line (PEP 8) 126e4cf [Nicholas Chammas] wait for specific cluster states
Diffstat (limited to 'ec2')
-rwxr-xr-xec2/spark_ec2.py111
1 files changed, 86 insertions, 25 deletions
diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py
index 941dfb988b..27f468ea4f 100755
--- a/ec2/spark_ec2.py
+++ b/ec2/spark_ec2.py
@@ -32,6 +32,7 @@ import sys
import tempfile
import time
import urllib2
+import warnings
from optparse import OptionParser
from sys import stderr
import boto
@@ -61,8 +62,8 @@ def parse_args():
"-s", "--slaves", type="int", default=1,
help="Number of slaves to launch (default: %default)")
parser.add_option(
- "-w", "--wait", type="int", default=120,
- help="Seconds to wait for nodes to start (default: %default)")
+ "-w", "--wait", type="int",
+ help="DEPRECATED (no longer necessary) - Seconds to wait for nodes to start")
parser.add_option(
"-k", "--key-pair",
help="Key pair to use on instances")
@@ -195,18 +196,6 @@ def get_or_make_group(conn, name):
return conn.create_security_group(name, "Spark EC2 group")
-# Wait for a set of launched instances to exit the "pending" state
-# (i.e. either to start running or to fail and be terminated)
-def wait_for_instances(conn, instances):
- while True:
- for i in instances:
- i.update()
- if len([i for i in instances if i.state == 'pending']) > 0:
- time.sleep(5)
- else:
- return
-
-
# Check whether a given EC2 instance object is in a state we consider active,
# i.e. not terminating or terminated. We count both stopping and stopped as
# active since we can restart stopped clusters.
@@ -619,14 +608,64 @@ def setup_spark_cluster(master, opts):
print "Ganglia started at http://%s:5080/ganglia" % master
-# Wait for a whole cluster (masters, slaves and ZooKeeper) to start up
-def wait_for_cluster(conn, wait_secs, master_nodes, slave_nodes):
- print "Waiting for instances to start up..."
- time.sleep(5)
- wait_for_instances(conn, master_nodes)
- wait_for_instances(conn, slave_nodes)
- print "Waiting %d more seconds..." % wait_secs
- time.sleep(wait_secs)
+def is_ssh_available(host, opts):
+ "Checks if SSH is available on the host."
+ try:
+ with open(os.devnull, 'w') as devnull:
+ ret = subprocess.check_call(
+ ssh_command(opts) + ['-t', '-t', '-o', 'ConnectTimeout=3',
+ '%s@%s' % (opts.user, host), stringify_command('true')],
+ stdout=devnull,
+ stderr=devnull
+ )
+ return ret == 0
+ except subprocess.CalledProcessError as e:
+ return False
+
+
+def is_cluster_ssh_available(cluster_instances, opts):
+ for i in cluster_instances:
+ if not is_ssh_available(host=i.ip_address, opts=opts):
+ return False
+ else:
+ return True
+
+
+def wait_for_cluster_state(cluster_instances, cluster_state, opts):
+ """
+ cluster_instances: a list of boto.ec2.instance.Instance
+ cluster_state: a string representing the desired state of all the instances in the cluster
+ value can be 'ssh-ready' or a valid value from boto.ec2.instance.InstanceState such as
+ 'running', 'terminated', etc.
+ (would be nice to replace this with a proper enum: http://stackoverflow.com/a/1695250)
+ """
+ sys.stdout.write(
+ "Waiting for all instances in cluster to enter '{s}' state.".format(s=cluster_state)
+ )
+ sys.stdout.flush()
+
+ num_attempts = 0
+
+ while True:
+ time.sleep(3 * num_attempts)
+
+ for i in cluster_instances:
+ s = i.update() # capture output to suppress print to screen in newer versions of boto
+
+ if cluster_state == 'ssh-ready':
+ if all(i.state == 'running' for i in cluster_instances) and \
+ is_cluster_ssh_available(cluster_instances, opts):
+ break
+ else:
+ if all(i.state == cluster_state for i in cluster_instances):
+ break
+
+ num_attempts += 1
+
+ sys.stdout.write(".")
+ sys.stdout.flush()
+
+ sys.stdout.write("\n")
# Get number of local disks available for a given EC2 instance type.
@@ -868,6 +907,16 @@ def real_main():
(opts, action, cluster_name) = parse_args()
# Input parameter validation
+ if opts.wait is not None:
+ # NOTE: DeprecationWarnings are silent in 2.7+ by default.
+ # To show them, run Python with the -Wdefault switch.
+ # See: https://docs.python.org/3.5/whatsnew/2.7.html
+ warnings.warn(
+ "This option is deprecated and has no effect. "
+ "spark-ec2 automatically waits as long as necessary for clusters to startup.",
+ DeprecationWarning
+ )
+
if opts.ebs_vol_num > 8:
print >> stderr, "ebs-vol-num cannot be greater than 8"
sys.exit(1)
@@ -890,7 +939,11 @@ def real_main():
(master_nodes, slave_nodes) = get_existing_cluster(conn, opts, cluster_name)
else:
(master_nodes, slave_nodes) = launch_cluster(conn, opts, cluster_name)
- wait_for_cluster(conn, opts.wait, master_nodes, slave_nodes)
+ wait_for_cluster_state(
+ cluster_instances=(master_nodes + slave_nodes),
+ cluster_state='ssh-ready',
+ opts=opts
+ )
setup_cluster(conn, master_nodes, slave_nodes, opts, True)
elif action == "destroy":
@@ -919,7 +972,11 @@ def real_main():
else:
group_names = [opts.security_group_prefix + "-master",
opts.security_group_prefix + "-slaves"]
-
+ wait_for_cluster_state(
+ cluster_instances=(master_nodes + slave_nodes),
+ cluster_state='terminated',
+ opts=opts
+ )
attempt = 1
while attempt <= 3:
print "Attempt %d" % attempt
@@ -1019,7 +1076,11 @@ def real_main():
for inst in master_nodes:
if inst.state not in ["shutting-down", "terminated"]:
inst.start()
- wait_for_cluster(conn, opts.wait, master_nodes, slave_nodes)
+ wait_for_cluster_state(
+ cluster_instances=(master_nodes + slave_nodes),
+ cluster_state='ssh-ready',
+ opts=opts
+ )
setup_cluster(conn, master_nodes, slave_nodes, opts, False)
else: