aboutsummaryrefslogtreecommitdiff
path: root/ec2/spark_ec2.py
diff options
context:
space:
mode:
Diffstat (limited to 'ec2/spark_ec2.py')
-rwxr-xr-xec2/spark_ec2.py36
1 files changed, 24 insertions, 12 deletions
diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py
index 725b1e47e0..87b2112fe4 100755
--- a/ec2/spark_ec2.py
+++ b/ec2/spark_ec2.py
@@ -34,6 +34,7 @@ import subprocess
import sys
import tarfile
import tempfile
+import textwrap
import time
import urllib2
import warnings
@@ -681,21 +682,32 @@ def setup_spark_cluster(master, opts):
print "Ganglia started at http://%s:5080/ganglia" % master
-def is_ssh_available(host, opts):
+def is_ssh_available(host, opts, print_ssh_output=True):
"""
Check if SSH is available on a host.
"""
- try:
- with open(os.devnull, 'w') as devnull:
- ret = subprocess.check_call(
- ssh_command(opts) + ['-t', '-t', '-o', 'ConnectTimeout=3',
- '%s@%s' % (opts.user, host), stringify_command('true')],
- stdout=devnull,
- stderr=devnull
- )
- return ret == 0
- except subprocess.CalledProcessError as e:
- return False
+ s = subprocess.Popen(
+ ssh_command(opts) + ['-t', '-t', '-o', 'ConnectTimeout=3',
+ '%s@%s' % (opts.user, host), stringify_command('true')],
+ stdout=subprocess.PIPE,
+ stderr=subprocess.STDOUT # we pipe stderr through stdout to preserve output order
+ )
+ cmd_output = s.communicate()[0] # [1] is stderr, which we redirected to stdout
+
+ if s.returncode != 0 and print_ssh_output:
+ # extra leading newline is for spacing in wait_for_cluster_state()
+ print textwrap.dedent("""\n
+ Warning: SSH connection error. (This could be temporary.)
+ Host: {h}
+ SSH return code: {r}
+ SSH output: {o}
+ """).format(
+ h=host,
+ r=s.returncode,
+ o=cmd_output.strip()
+ )
+
+ return s.returncode == 0
def is_cluster_ssh_available(cluster_instances, opts):