From 317e114e11669899618c7c06bbc0091b36618f36 Mon Sep 17 00:00:00 2001 From: Nicholas Chammas Date: Sat, 29 Nov 2014 00:31:06 -0800 Subject: [SPARK-3398] [SPARK-4325] [EC2] Use EC2 status checks. This PR re-introduces [0e648bc](https://github.com/apache/spark/commit/0e648bc2bedcbeb55fce5efac04f6dbad9f063b4) from PR #2339, which somehow never made it into the codebase. Additionally, it removes a now-unnecessary linear backoff on the SSH checks since we are blocking on EC2 status checks before testing SSH. Author: Nicholas Chammas Closes #3195 from nchammas/remove-ec2-ssh-backoff and squashes the following commits: efb29e1 [Nicholas Chammas] Revert "Remove linear backoff." ef3ca99 [Nicholas Chammas] reuse conn adb4eaa [Nicholas Chammas] Remove linear backoff. 55caa24 [Nicholas Chammas] Check EC2 status checks before SSH. --- ec2/spark_ec2.py | 48 ++++++++++++++++++++++++++++++++++++------------ 1 file changed, 36 insertions(+), 12 deletions(-) (limited to 'ec2') diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py index 742c7765e7..b83decadc2 100755 --- a/ec2/spark_ec2.py +++ b/ec2/spark_ec2.py @@ -33,6 +33,7 @@ import tempfile import time import urllib2 import warnings +from datetime import datetime from optparse import OptionParser from sys import stderr import boto @@ -589,7 +590,9 @@ def setup_spark_cluster(master, opts): def is_ssh_available(host, opts): - "Checks if SSH is available on the host." + """ + Check if SSH is available on a host. + """ try: with open(os.devnull, 'w') as devnull: ret = subprocess.check_call( @@ -604,6 +607,9 @@ def is_ssh_available(host, opts): def is_cluster_ssh_available(cluster_instances, opts): + """ + Check if SSH is available on all the instances in a cluster. + """ for i in cluster_instances: if not is_ssh_available(host=i.ip_address, opts=opts): return False @@ -611,8 +617,10 @@ def is_cluster_ssh_available(cluster_instances, opts): return True -def wait_for_cluster_state(cluster_instances, cluster_state, opts): +def wait_for_cluster_state(conn, opts, cluster_instances, cluster_state): """ + Wait for all the instances in the cluster to reach a designated state. + cluster_instances: a list of boto.ec2.instance.Instance cluster_state: a string representing the desired state of all the instances in the cluster value can be 'ssh-ready' or a valid value from boto.ec2.instance.InstanceState such as @@ -620,20 +628,27 @@ def wait_for_cluster_state(cluster_instances, cluster_state, opts): (would be nice to replace this with a proper enum: http://stackoverflow.com/a/1695250) """ sys.stdout.write( - "Waiting for all instances in cluster to enter '{s}' state.".format(s=cluster_state) + "Waiting for cluster to enter '{s}' state.".format(s=cluster_state) ) sys.stdout.flush() + start_time = datetime.now() + num_attempts = 0 + conn = ec2.connect_to_region(opts.region) while True: - time.sleep(3 * num_attempts) + time.sleep(5 * num_attempts) # seconds for i in cluster_instances: - s = i.update() # capture output to suppress print to screen in newer versions of boto + i.update() + + statuses = conn.get_all_instance_status(instance_ids=[i.id for i in cluster_instances]) if cluster_state == 'ssh-ready': if all(i.state == 'running' for i in cluster_instances) and \ + all(s.system_status.status == 'ok' for s in statuses) and \ + all(s.instance_status.status == 'ok' for s in statuses) and \ is_cluster_ssh_available(cluster_instances, opts): break else: @@ -647,6 +662,12 @@ def wait_for_cluster_state(cluster_instances, cluster_state, opts): sys.stdout.write("\n") + end_time = datetime.now() + print "Cluster is now in '{s}' state. Waited {t} seconds.".format( + s=cluster_state, + t=(end_time - start_time).seconds + ) + # Get number of local disks available for a given EC2 instance type. def get_num_disks(instance_type): @@ -895,7 +916,7 @@ def real_main(): # See: https://docs.python.org/3.5/whatsnew/2.7.html warnings.warn( "This option is deprecated and has no effect. " - "spark-ec2 automatically waits as long as necessary for clusters to startup.", + "spark-ec2 automatically waits as long as necessary for clusters to start up.", DeprecationWarning ) @@ -922,9 +943,10 @@ def real_main(): else: (master_nodes, slave_nodes) = launch_cluster(conn, opts, cluster_name) wait_for_cluster_state( + conn=conn, + opts=opts, cluster_instances=(master_nodes + slave_nodes), - cluster_state='ssh-ready', - opts=opts + cluster_state='ssh-ready' ) setup_cluster(conn, master_nodes, slave_nodes, opts, True) @@ -951,9 +973,10 @@ def real_main(): print "Deleting security groups (this will take some time)..." group_names = [cluster_name + "-master", cluster_name + "-slaves"] wait_for_cluster_state( + conn=conn, + opts=opts, cluster_instances=(master_nodes + slave_nodes), - cluster_state='terminated', - opts=opts + cluster_state='terminated' ) attempt = 1 while attempt <= 3: @@ -1055,9 +1078,10 @@ def real_main(): if inst.state not in ["shutting-down", "terminated"]: inst.start() wait_for_cluster_state( + conn=conn, + opts=opts, cluster_instances=(master_nodes + slave_nodes), - cluster_state='ssh-ready', - opts=opts + cluster_state='ssh-ready' ) setup_cluster(conn, master_nodes, slave_nodes, opts, False) -- cgit v1.2.3