aboutsummaryrefslogtreecommitdiff
path: root/ec2
diff options
context:
space:
mode:
authorNicholas Chammas <nicholas.chammas@gmail.com>2014-11-29 00:31:06 -0800
committerJosh Rosen <joshrosen@databricks.com>2014-11-29 00:31:06 -0800
commit317e114e11669899618c7c06bbc0091b36618f36 (patch)
treec96350e51dcd7dea91eff9f8c1432cb0bd34d29f /ec2
parent047ff573f7e2d5f1320f8e8121e185ffebee61e0 (diff)
downloadspark-317e114e11669899618c7c06bbc0091b36618f36.tar.gz
spark-317e114e11669899618c7c06bbc0091b36618f36.tar.bz2
spark-317e114e11669899618c7c06bbc0091b36618f36.zip
[SPARK-3398] [SPARK-4325] [EC2] Use EC2 status checks.
This PR re-introduces [0e648bc](https://github.com/apache/spark/commit/0e648bc2bedcbeb55fce5efac04f6dbad9f063b4) from PR #2339, which somehow never made it into the codebase. Additionally, it removes a now-unnecessary linear backoff on the SSH checks since we are blocking on EC2 status checks before testing SSH. Author: Nicholas Chammas <nicholas.chammas@gmail.com> Closes #3195 from nchammas/remove-ec2-ssh-backoff and squashes the following commits: efb29e1 [Nicholas Chammas] Revert "Remove linear backoff." ef3ca99 [Nicholas Chammas] reuse conn adb4eaa [Nicholas Chammas] Remove linear backoff. 55caa24 [Nicholas Chammas] Check EC2 status checks before SSH.
Diffstat (limited to 'ec2')
-rwxr-xr-xec2/spark_ec2.py48
1 files changed, 36 insertions, 12 deletions
diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py
index 742c7765e7..b83decadc2 100755
--- a/ec2/spark_ec2.py
+++ b/ec2/spark_ec2.py
@@ -33,6 +33,7 @@ import tempfile
import time
import urllib2
import warnings
+from datetime import datetime
from optparse import OptionParser
from sys import stderr
import boto
@@ -589,7 +590,9 @@ def setup_spark_cluster(master, opts):
def is_ssh_available(host, opts):
- "Checks if SSH is available on the host."
+ """
+ Check if SSH is available on a host.
+ """
try:
with open(os.devnull, 'w') as devnull:
ret = subprocess.check_call(
@@ -604,6 +607,9 @@ def is_ssh_available(host, opts):
def is_cluster_ssh_available(cluster_instances, opts):
+ """
+ Check if SSH is available on all the instances in a cluster.
+ """
for i in cluster_instances:
if not is_ssh_available(host=i.ip_address, opts=opts):
return False
@@ -611,8 +617,10 @@ def is_cluster_ssh_available(cluster_instances, opts):
return True
-def wait_for_cluster_state(cluster_instances, cluster_state, opts):
+def wait_for_cluster_state(conn, opts, cluster_instances, cluster_state):
"""
+ Wait for all the instances in the cluster to reach a designated state.
+
cluster_instances: a list of boto.ec2.instance.Instance
cluster_state: a string representing the desired state of all the instances in the cluster
value can be 'ssh-ready' or a valid value from boto.ec2.instance.InstanceState such as
@@ -620,20 +628,27 @@ def wait_for_cluster_state(cluster_instances, cluster_state, opts):
(would be nice to replace this with a proper enum: http://stackoverflow.com/a/1695250)
"""
sys.stdout.write(
- "Waiting for all instances in cluster to enter '{s}' state.".format(s=cluster_state)
+ "Waiting for cluster to enter '{s}' state.".format(s=cluster_state)
)
sys.stdout.flush()
+ start_time = datetime.now()
+
num_attempts = 0
+ conn = ec2.connect_to_region(opts.region)
while True:
- time.sleep(3 * num_attempts)
+ time.sleep(5 * num_attempts) # seconds
for i in cluster_instances:
- s = i.update() # capture output to suppress print to screen in newer versions of boto
+ i.update()
+
+ statuses = conn.get_all_instance_status(instance_ids=[i.id for i in cluster_instances])
if cluster_state == 'ssh-ready':
if all(i.state == 'running' for i in cluster_instances) and \
+ all(s.system_status.status == 'ok' for s in statuses) and \
+ all(s.instance_status.status == 'ok' for s in statuses) and \
is_cluster_ssh_available(cluster_instances, opts):
break
else:
@@ -647,6 +662,12 @@ def wait_for_cluster_state(cluster_instances, cluster_state, opts):
sys.stdout.write("\n")
+ end_time = datetime.now()
+ print "Cluster is now in '{s}' state. Waited {t} seconds.".format(
+ s=cluster_state,
+ t=(end_time - start_time).seconds
+ )
+
# Get number of local disks available for a given EC2 instance type.
def get_num_disks(instance_type):
@@ -895,7 +916,7 @@ def real_main():
# See: https://docs.python.org/3.5/whatsnew/2.7.html
warnings.warn(
"This option is deprecated and has no effect. "
- "spark-ec2 automatically waits as long as necessary for clusters to startup.",
+ "spark-ec2 automatically waits as long as necessary for clusters to start up.",
DeprecationWarning
)
@@ -922,9 +943,10 @@ def real_main():
else:
(master_nodes, slave_nodes) = launch_cluster(conn, opts, cluster_name)
wait_for_cluster_state(
+ conn=conn,
+ opts=opts,
cluster_instances=(master_nodes + slave_nodes),
- cluster_state='ssh-ready',
- opts=opts
+ cluster_state='ssh-ready'
)
setup_cluster(conn, master_nodes, slave_nodes, opts, True)
@@ -951,9 +973,10 @@ def real_main():
print "Deleting security groups (this will take some time)..."
group_names = [cluster_name + "-master", cluster_name + "-slaves"]
wait_for_cluster_state(
+ conn=conn,
+ opts=opts,
cluster_instances=(master_nodes + slave_nodes),
- cluster_state='terminated',
- opts=opts
+ cluster_state='terminated'
)
attempt = 1
while attempt <= 3:
@@ -1055,9 +1078,10 @@ def real_main():
if inst.state not in ["shutting-down", "terminated"]:
inst.start()
wait_for_cluster_state(
+ conn=conn,
+ opts=opts,
cluster_instances=(master_nodes + slave_nodes),
- cluster_state='ssh-ready',
- opts=opts
+ cluster_state='ssh-ready'
)
setup_cluster(conn, master_nodes, slave_nodes, opts, False)