aboutsummaryrefslogtreecommitdiff
path: root/ec2
diff options
context:
space:
mode:
authorDavies Liu <davies@databricks.com>2015-04-16 16:20:57 -0700
committerJosh Rosen <joshrosen@databricks.com>2015-04-16 16:20:57 -0700
commit04e44b37cc04f62fbf9e08c7076349e0a4d12ea8 (patch)
treeb6429253955210445ddc37faa4d5166ea25a91e2 /ec2
parent55f553a979db925aa0c3559f7e80b99d2bf3feb4 (diff)
downloadspark-04e44b37cc04f62fbf9e08c7076349e0a4d12ea8.tar.gz
spark-04e44b37cc04f62fbf9e08c7076349e0a4d12ea8.tar.bz2
spark-04e44b37cc04f62fbf9e08c7076349e0a4d12ea8.zip
[SPARK-4897] [PySpark] Python 3 support
This PR update PySpark to support Python 3 (tested with 3.4). Known issue: unpickle array from Pyrolite is broken in Python 3, those tests are skipped. TODO: ec2/spark-ec2.py is not fully tested with python3. Author: Davies Liu <davies@databricks.com> Author: twneale <twneale@gmail.com> Author: Josh Rosen <joshrosen@databricks.com> Closes #5173 from davies/python3 and squashes the following commits: d7d6323 [Davies Liu] fix tests 6c52a98 [Davies Liu] fix mllib test 99e334f [Davies Liu] update timeout b716610 [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3 cafd5ec [Davies Liu] adddress comments from @mengxr bf225d7 [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3 179fc8d [Davies Liu] tuning flaky tests 8c8b957 [Davies Liu] fix ResourceWarning in Python 3 5c57c95 [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3 4006829 [Davies Liu] fix test 2fc0066 [Davies Liu] add python3 path 71535e9 [Davies Liu] fix xrange and divide 5a55ab4 [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3 125f12c [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3 ed498c8 [Davies Liu] fix compatibility with python 3 820e649 [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3 e8ce8c9 [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3 ad7c374 [Davies Liu] fix mllib test and warning ef1fc2f [Davies Liu] fix tests 4eee14a [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3 20112ff [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3 59bb492 [Davies Liu] fix tests 1da268c [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3 ca0fdd3 [Davies Liu] fix code style 9563a15 [Davies Liu] add imap back for python 2 0b1ec04 [Davies Liu] make python examples work with Python 3 d2fd566 [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3 a716d34 [Davies Liu] test with python 3.4 f1700e8 [Davies Liu] fix test in python3 671b1db [Davies Liu] fix test in python3 692ff47 [Davies Liu] fix flaky test 7b9699f [Davies Liu] invalidate import cache for Python 3.3+ 9c58497 [Davies Liu] fix kill worker 309bfbf [Davies Liu] keep compatibility 5707476 [Davies Liu] cleanup, fix hash of string in 3.3+ 8662d5b [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3 f53e1f0 [Davies Liu] fix tests 70b6b73 [Davies Liu] compile ec2/spark_ec2.py in python 3 a39167e [Davies Liu] support customize class in __main__ 814c77b [Davies Liu] run unittests with python 3 7f4476e [Davies Liu] mllib tests passed d737924 [Davies Liu] pass ml tests 375ea17 [Davies Liu] SQL tests pass 6cc42a9 [Davies Liu] rename 431a8de [Davies Liu] streaming tests pass 78901a7 [Davies Liu] fix hash of serializer in Python 3 24b2f2e [Davies Liu] pass all RDD tests 35f48fe [Davies Liu] run future again 1eebac2 [Davies Liu] fix conflict in ec2/spark_ec2.py 6e3c21d [Davies Liu] make cloudpickle work with Python3 2fb2db3 [Josh Rosen] Guard more changes behind sys.version; still doesn't run 1aa5e8f [twneale] Turned out `pickle.DictionaryType is dict` == True, so swapped it out 7354371 [twneale] buffer --> memoryview I'm not super sure if this a valid change, but the 2.7 docs recommend using memoryview over buffer where possible, so hoping it'll work. b69ccdf [twneale] Uses the pure python pickle._Pickler instead of c-extension _pickle.Pickler. It appears pyspark 2.7 uses the pure python pickler as well, so this shouldn't degrade pickling performance (?). f40d925 [twneale] xrange --> range e104215 [twneale] Replaces 2.7 types.InstsanceType with 3.4 `object`....could be horribly wrong depending on how types.InstanceType is used elsewhere in the package--see http://bugs.python.org/issue8206 79de9d0 [twneale] Replaces python2.7 `file` with 3.4 _io.TextIOWrapper 2adb42d [Josh Rosen] Fix up some import differences between Python 2 and 3 854be27 [Josh Rosen] Run `futurize` on Python code: 7c5b4ce [Josh Rosen] Remove Python 3 check in shell.py.
Diffstat (limited to 'ec2')
-rwxr-xr-xec2/spark_ec2.py262
1 files changed, 131 insertions, 131 deletions
diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py
index 0c1f24761d..87c0818279 100755
--- a/ec2/spark_ec2.py
+++ b/ec2/spark_ec2.py
@@ -19,7 +19,7 @@
# limitations under the License.
#
-from __future__ import with_statement
+from __future__ import with_statement, print_function
import hashlib
import itertools
@@ -37,12 +37,17 @@ import tarfile
import tempfile
import textwrap
import time
-import urllib2
import warnings
from datetime import datetime
from optparse import OptionParser
from sys import stderr
+if sys.version < "3":
+ from urllib2 import urlopen, Request, HTTPError
+else:
+ from urllib.request import urlopen, Request
+ from urllib.error import HTTPError
+
SPARK_EC2_VERSION = "1.2.1"
SPARK_EC2_DIR = os.path.dirname(os.path.realpath(__file__))
@@ -88,10 +93,10 @@ def setup_external_libs(libs):
SPARK_EC2_LIB_DIR = os.path.join(SPARK_EC2_DIR, "lib")
if not os.path.exists(SPARK_EC2_LIB_DIR):
- print "Downloading external libraries that spark-ec2 needs from PyPI to {path}...".format(
+ print("Downloading external libraries that spark-ec2 needs from PyPI to {path}...".format(
path=SPARK_EC2_LIB_DIR
- )
- print "This should be a one-time operation."
+ ))
+ print("This should be a one-time operation.")
os.mkdir(SPARK_EC2_LIB_DIR)
for lib in libs:
@@ -100,8 +105,8 @@ def setup_external_libs(libs):
if not os.path.isdir(lib_dir):
tgz_file_path = os.path.join(SPARK_EC2_LIB_DIR, versioned_lib_name + ".tar.gz")
- print " - Downloading {lib}...".format(lib=lib["name"])
- download_stream = urllib2.urlopen(
+ print(" - Downloading {lib}...".format(lib=lib["name"]))
+ download_stream = urlopen(
"{prefix}/{first_letter}/{lib_name}/{lib_name}-{lib_version}.tar.gz".format(
prefix=PYPI_URL_PREFIX,
first_letter=lib["name"][:1],
@@ -113,13 +118,13 @@ def setup_external_libs(libs):
tgz_file.write(download_stream.read())
with open(tgz_file_path) as tar:
if hashlib.md5(tar.read()).hexdigest() != lib["md5"]:
- print >> stderr, "ERROR: Got wrong md5sum for {lib}.".format(lib=lib["name"])
+ print("ERROR: Got wrong md5sum for {lib}.".format(lib=lib["name"]), file=stderr)
sys.exit(1)
tar = tarfile.open(tgz_file_path)
tar.extractall(path=SPARK_EC2_LIB_DIR)
tar.close()
os.remove(tgz_file_path)
- print " - Finished downloading {lib}.".format(lib=lib["name"])
+ print(" - Finished downloading {lib}.".format(lib=lib["name"]))
sys.path.insert(1, lib_dir)
@@ -299,12 +304,12 @@ def parse_args():
if home_dir is None or not os.path.isfile(home_dir + '/.boto'):
if not os.path.isfile('/etc/boto.cfg'):
if os.getenv('AWS_ACCESS_KEY_ID') is None:
- print >> stderr, ("ERROR: The environment variable AWS_ACCESS_KEY_ID " +
- "must be set")
+ print("ERROR: The environment variable AWS_ACCESS_KEY_ID must be set",
+ file=stderr)
sys.exit(1)
if os.getenv('AWS_SECRET_ACCESS_KEY') is None:
- print >> stderr, ("ERROR: The environment variable AWS_SECRET_ACCESS_KEY " +
- "must be set")
+ print("ERROR: The environment variable AWS_SECRET_ACCESS_KEY must be set",
+ file=stderr)
sys.exit(1)
return (opts, action, cluster_name)
@@ -316,7 +321,7 @@ def get_or_make_group(conn, name, vpc_id):
if len(group) > 0:
return group[0]
else:
- print "Creating security group " + name
+ print("Creating security group " + name)
return conn.create_security_group(name, "Spark EC2 group", vpc_id)
@@ -324,18 +329,19 @@ def get_validate_spark_version(version, repo):
if "." in version:
version = version.replace("v", "")
if version not in VALID_SPARK_VERSIONS:
- print >> stderr, "Don't know about Spark version: {v}".format(v=version)
+ print("Don't know about Spark version: {v}".format(v=version), file=stderr)
sys.exit(1)
return version
else:
github_commit_url = "{repo}/commit/{commit_hash}".format(repo=repo, commit_hash=version)
- request = urllib2.Request(github_commit_url)
+ request = Request(github_commit_url)
request.get_method = lambda: 'HEAD'
try:
- response = urllib2.urlopen(request)
- except urllib2.HTTPError, e:
- print >> stderr, "Couldn't validate Spark commit: {url}".format(url=github_commit_url)
- print >> stderr, "Received HTTP response code of {code}.".format(code=e.code)
+ response = urlopen(request)
+ except HTTPError as e:
+ print("Couldn't validate Spark commit: {url}".format(url=github_commit_url),
+ file=stderr)
+ print("Received HTTP response code of {code}.".format(code=e.code), file=stderr)
sys.exit(1)
return version
@@ -394,8 +400,7 @@ def get_spark_ami(opts):
instance_type = EC2_INSTANCE_TYPES[opts.instance_type]
else:
instance_type = "pvm"
- print >> stderr,\
- "Don't recognize %s, assuming type is pvm" % opts.instance_type
+ print("Don't recognize %s, assuming type is pvm" % opts.instance_type, file=stderr)
# URL prefix from which to fetch AMI information
ami_prefix = "{r}/{b}/ami-list".format(
@@ -404,10 +409,10 @@ def get_spark_ami(opts):
ami_path = "%s/%s/%s" % (ami_prefix, opts.region, instance_type)
try:
- ami = urllib2.urlopen(ami_path).read().strip()
- print "Spark AMI: " + ami
+ ami = urlopen(ami_path).read().strip()
+ print("Spark AMI: " + ami)
except:
- print >> stderr, "Could not resolve AMI at: " + ami_path
+ print("Could not resolve AMI at: " + ami_path, file=stderr)
sys.exit(1)
return ami
@@ -419,11 +424,11 @@ def get_spark_ami(opts):
# Fails if there already instances running in the cluster's groups.
def launch_cluster(conn, opts, cluster_name):
if opts.identity_file is None:
- print >> stderr, "ERROR: Must provide an identity file (-i) for ssh connections."
+ print("ERROR: Must provide an identity file (-i) for ssh connections.", file=stderr)
sys.exit(1)
if opts.key_pair is None:
- print >> stderr, "ERROR: Must provide a key pair name (-k) to use on instances."
+ print("ERROR: Must provide a key pair name (-k) to use on instances.", file=stderr)
sys.exit(1)
user_data_content = None
@@ -431,7 +436,7 @@ def launch_cluster(conn, opts, cluster_name):
with open(opts.user_data) as user_data_file:
user_data_content = user_data_file.read()
- print "Setting up security groups..."
+ print("Setting up security groups...")
master_group = get_or_make_group(conn, cluster_name + "-master", opts.vpc_id)
slave_group = get_or_make_group(conn, cluster_name + "-slaves", opts.vpc_id)
authorized_address = opts.authorized_address
@@ -497,8 +502,8 @@ def launch_cluster(conn, opts, cluster_name):
existing_masters, existing_slaves = get_existing_cluster(conn, opts, cluster_name,
die_on_error=False)
if existing_slaves or (existing_masters and not opts.use_existing_master):
- print >> stderr, ("ERROR: There are already instances running in " +
- "group %s or %s" % (master_group.name, slave_group.name))
+ print("ERROR: There are already instances running in group %s or %s" %
+ (master_group.name, slave_group.name), file=stderr)
sys.exit(1)
# Figure out Spark AMI
@@ -511,12 +516,12 @@ def launch_cluster(conn, opts, cluster_name):
additional_group_ids = [sg.id
for sg in conn.get_all_security_groups()
if opts.additional_security_group in (sg.name, sg.id)]
- print "Launching instances..."
+ print("Launching instances...")
try:
image = conn.get_all_images(image_ids=[opts.ami])[0]
except:
- print >> stderr, "Could not find AMI " + opts.ami
+ print("Could not find AMI " + opts.ami, file=stderr)
sys.exit(1)
# Create block device mapping so that we can add EBS volumes if asked to.
@@ -542,8 +547,8 @@ def launch_cluster(conn, opts, cluster_name):
# Launch slaves
if opts.spot_price is not None:
# Launch spot instances with the requested price
- print ("Requesting %d slaves as spot instances with price $%.3f" %
- (opts.slaves, opts.spot_price))
+ print("Requesting %d slaves as spot instances with price $%.3f" %
+ (opts.slaves, opts.spot_price))
zones = get_zones(conn, opts)
num_zones = len(zones)
i = 0
@@ -566,7 +571,7 @@ def launch_cluster(conn, opts, cluster_name):
my_req_ids += [req.id for req in slave_reqs]
i += 1
- print "Waiting for spot instances to be granted..."
+ print("Waiting for spot instances to be granted...")
try:
while True:
time.sleep(10)
@@ -579,24 +584,24 @@ def launch_cluster(conn, opts, cluster_name):
if i in id_to_req and id_to_req[i].state == "active":
active_instance_ids.append(id_to_req[i].instance_id)
if len(active_instance_ids) == opts.slaves:
- print "All %d slaves granted" % opts.slaves
+ print("All %d slaves granted" % opts.slaves)
reservations = conn.get_all_reservations(active_instance_ids)
slave_nodes = []
for r in reservations:
slave_nodes += r.instances
break
else:
- print "%d of %d slaves granted, waiting longer" % (
- len(active_instance_ids), opts.slaves)
+ print("%d of %d slaves granted, waiting longer" % (
+ len(active_instance_ids), opts.slaves))
except:
- print "Canceling spot instance requests"
+ print("Canceling spot instance requests")
conn.cancel_spot_instance_requests(my_req_ids)
# Log a warning if any of these requests actually launched instances:
(master_nodes, slave_nodes) = get_existing_cluster(
conn, opts, cluster_name, die_on_error=False)
running = len(master_nodes) + len(slave_nodes)
if running:
- print >> stderr, ("WARNING: %d instances are still running" % running)
+ print(("WARNING: %d instances are still running" % running), file=stderr)
sys.exit(0)
else:
# Launch non-spot instances
@@ -618,16 +623,16 @@ def launch_cluster(conn, opts, cluster_name):
placement_group=opts.placement_group,
user_data=user_data_content)
slave_nodes += slave_res.instances
- print "Launched {s} slave{plural_s} in {z}, regid = {r}".format(
- s=num_slaves_this_zone,
- plural_s=('' if num_slaves_this_zone == 1 else 's'),
- z=zone,
- r=slave_res.id)
+ print("Launched {s} slave{plural_s} in {z}, regid = {r}".format(
+ s=num_slaves_this_zone,
+ plural_s=('' if num_slaves_this_zone == 1 else 's'),
+ z=zone,
+ r=slave_res.id))
i += 1
# Launch or resume masters
if existing_masters:
- print "Starting master..."
+ print("Starting master...")
for inst in existing_masters:
if inst.state not in ["shutting-down", "terminated"]:
inst.start()
@@ -650,10 +655,10 @@ def launch_cluster(conn, opts, cluster_name):
user_data=user_data_content)
master_nodes = master_res.instances
- print "Launched master in %s, regid = %s" % (zone, master_res.id)
+ print("Launched master in %s, regid = %s" % (zone, master_res.id))
# This wait time corresponds to SPARK-4983
- print "Waiting for AWS to propagate instance metadata..."
+ print("Waiting for AWS to propagate instance metadata...")
time.sleep(5)
# Give the instances descriptive names
for master in master_nodes:
@@ -674,8 +679,8 @@ def get_existing_cluster(conn, opts, cluster_name, die_on_error=True):
Get the EC2 instances in an existing cluster if available.
Returns a tuple of lists of EC2 instance objects for the masters and slaves.
"""
- print "Searching for existing cluster {c} in region {r}...".format(
- c=cluster_name, r=opts.region)
+ print("Searching for existing cluster {c} in region {r}...".format(
+ c=cluster_name, r=opts.region))
def get_instances(group_names):
"""
@@ -693,16 +698,15 @@ def get_existing_cluster(conn, opts, cluster_name, die_on_error=True):
slave_instances = get_instances([cluster_name + "-slaves"])
if any((master_instances, slave_instances)):
- print "Found {m} master{plural_m}, {s} slave{plural_s}.".format(
- m=len(master_instances),
- plural_m=('' if len(master_instances) == 1 else 's'),
- s=len(slave_instances),
- plural_s=('' if len(slave_instances) == 1 else 's'))
+ print("Found {m} master{plural_m}, {s} slave{plural_s}.".format(
+ m=len(master_instances),
+ plural_m=('' if len(master_instances) == 1 else 's'),
+ s=len(slave_instances),
+ plural_s=('' if len(slave_instances) == 1 else 's')))
if not master_instances and die_on_error:
- print >> sys.stderr, \
- "ERROR: Could not find a master for cluster {c} in region {r}.".format(
- c=cluster_name, r=opts.region)
+ print("ERROR: Could not find a master for cluster {c} in region {r}.".format(
+ c=cluster_name, r=opts.region), file=sys.stderr)
sys.exit(1)
return (master_instances, slave_instances)
@@ -713,7 +717,7 @@ def get_existing_cluster(conn, opts, cluster_name, die_on_error=True):
def setup_cluster(conn, master_nodes, slave_nodes, opts, deploy_ssh_key):
master = get_dns_name(master_nodes[0], opts.private_ips)
if deploy_ssh_key:
- print "Generating cluster's SSH key on master..."
+ print("Generating cluster's SSH key on master...")
key_setup = """
[ -f ~/.ssh/id_rsa ] ||
(ssh-keygen -q -t rsa -N '' -f ~/.ssh/id_rsa &&
@@ -721,10 +725,10 @@ def setup_cluster(conn, master_nodes, slave_nodes, opts, deploy_ssh_key):
"""
ssh(master, opts, key_setup)
dot_ssh_tar = ssh_read(master, opts, ['tar', 'c', '.ssh'])
- print "Transferring cluster's SSH key to slaves..."
+ print("Transferring cluster's SSH key to slaves...")
for slave in slave_nodes:
slave_address = get_dns_name(slave, opts.private_ips)
- print slave_address
+ print(slave_address)
ssh_write(slave_address, opts, ['tar', 'x'], dot_ssh_tar)
modules = ['spark', 'ephemeral-hdfs', 'persistent-hdfs',
@@ -738,8 +742,8 @@ def setup_cluster(conn, master_nodes, slave_nodes, opts, deploy_ssh_key):
# NOTE: We should clone the repository before running deploy_files to
# prevent ec2-variables.sh from being overwritten
- print "Cloning spark-ec2 scripts from {r}/tree/{b} on master...".format(
- r=opts.spark_ec2_git_repo, b=opts.spark_ec2_git_branch)
+ print("Cloning spark-ec2 scripts from {r}/tree/{b} on master...".format(
+ r=opts.spark_ec2_git_repo, b=opts.spark_ec2_git_branch))
ssh(
host=master,
opts=opts,
@@ -749,7 +753,7 @@ def setup_cluster(conn, master_nodes, slave_nodes, opts, deploy_ssh_key):
b=opts.spark_ec2_git_branch)
)
- print "Deploying files to master..."
+ print("Deploying files to master...")
deploy_files(
conn=conn,
root_dir=SPARK_EC2_DIR + "/" + "deploy.generic",
@@ -760,25 +764,25 @@ def setup_cluster(conn, master_nodes, slave_nodes, opts, deploy_ssh_key):
)
if opts.deploy_root_dir is not None:
- print "Deploying {s} to master...".format(s=opts.deploy_root_dir)
+ print("Deploying {s} to master...".format(s=opts.deploy_root_dir))
deploy_user_files(
root_dir=opts.deploy_root_dir,
opts=opts,
master_nodes=master_nodes
)
- print "Running setup on master..."
+ print("Running setup on master...")
setup_spark_cluster(master, opts)
- print "Done!"
+ print("Done!")
def setup_spark_cluster(master, opts):
ssh(master, opts, "chmod u+x spark-ec2/setup.sh")
ssh(master, opts, "spark-ec2/setup.sh")
- print "Spark standalone cluster started at http://%s:8080" % master
+ print("Spark standalone cluster started at http://%s:8080" % master)
if opts.ganglia:
- print "Ganglia started at http://%s:5080/ganglia" % master
+ print("Ganglia started at http://%s:5080/ganglia" % master)
def is_ssh_available(host, opts, print_ssh_output=True):
@@ -795,7 +799,7 @@ def is_ssh_available(host, opts, print_ssh_output=True):
if s.returncode != 0 and print_ssh_output:
# extra leading newline is for spacing in wait_for_cluster_state()
- print textwrap.dedent("""\n
+ print(textwrap.dedent("""\n
Warning: SSH connection error. (This could be temporary.)
Host: {h}
SSH return code: {r}
@@ -804,7 +808,7 @@ def is_ssh_available(host, opts, print_ssh_output=True):
h=host,
r=s.returncode,
o=cmd_output.strip()
- )
+ ))
return s.returncode == 0
@@ -865,10 +869,10 @@ def wait_for_cluster_state(conn, opts, cluster_instances, cluster_state):
sys.stdout.write("\n")
end_time = datetime.now()
- print "Cluster is now in '{s}' state. Waited {t} seconds.".format(
+ print("Cluster is now in '{s}' state. Waited {t} seconds.".format(
s=cluster_state,
t=(end_time - start_time).seconds
- )
+ ))
# Get number of local disks available for a given EC2 instance type.
@@ -916,8 +920,8 @@ def get_num_disks(instance_type):
if instance_type in disks_by_instance:
return disks_by_instance[instance_type]
else:
- print >> stderr, ("WARNING: Don't know number of disks on instance type %s; assuming 1"
- % instance_type)
+ print("WARNING: Don't know number of disks on instance type %s; assuming 1"
+ % instance_type, file=stderr)
return 1
@@ -951,7 +955,7 @@ def deploy_files(conn, root_dir, opts, master_nodes, slave_nodes, modules):
# Spark-only custom deploy
spark_v = "%s|%s" % (opts.spark_git_repo, opts.spark_version)
tachyon_v = ""
- print "Deploying Spark via git hash; Tachyon won't be set up"
+ print("Deploying Spark via git hash; Tachyon won't be set up")
modules = filter(lambda x: x != "tachyon", modules)
master_addresses = [get_dns_name(i, opts.private_ips) for i in master_nodes]
@@ -1067,8 +1071,8 @@ def ssh(host, opts, command):
"--key-pair parameters and try again.".format(host))
else:
raise e
- print >> stderr, \
- "Error executing remote command, retrying after 30 seconds: {0}".format(e)
+ print("Error executing remote command, retrying after 30 seconds: {0}".format(e),
+ file=stderr)
time.sleep(30)
tries = tries + 1
@@ -1107,8 +1111,8 @@ def ssh_write(host, opts, command, arguments):
elif tries > 5:
raise RuntimeError("ssh_write failed with error %s" % proc.returncode)
else:
- print >> stderr, \
- "Error {0} while executing remote command, retrying after 30 seconds".format(status)
+ print("Error {0} while executing remote command, retrying after 30 seconds".
+ format(status), file=stderr)
time.sleep(30)
tries = tries + 1
@@ -1162,42 +1166,41 @@ def real_main():
if opts.identity_file is not None:
if not os.path.exists(opts.identity_file):
- print >> stderr,\
- "ERROR: The identity file '{f}' doesn't exist.".format(f=opts.identity_file)
+ print("ERROR: The identity file '{f}' doesn't exist.".format(f=opts.identity_file),
+ file=stderr)
sys.exit(1)
file_mode = os.stat(opts.identity_file).st_mode
if not (file_mode & S_IRUSR) or not oct(file_mode)[-2:] == '00':
- print >> stderr, "ERROR: The identity file must be accessible only by you."
- print >> stderr, 'You can fix this with: chmod 400 "{f}"'.format(f=opts.identity_file)
+ print("ERROR: The identity file must be accessible only by you.", file=stderr)
+ print('You can fix this with: chmod 400 "{f}"'.format(f=opts.identity_file),
+ file=stderr)
sys.exit(1)
if opts.instance_type not in EC2_INSTANCE_TYPES:
- print >> stderr, "Warning: Unrecognized EC2 instance type for instance-type: {t}".format(
- t=opts.instance_type)
+ print("Warning: Unrecognized EC2 instance type for instance-type: {t}".format(
+ t=opts.instance_type), file=stderr)
if opts.master_instance_type != "":
if opts.master_instance_type not in EC2_INSTANCE_TYPES:
- print >> stderr, \
- "Warning: Unrecognized EC2 instance type for master-instance-type: {t}".format(
- t=opts.master_instance_type)
+ print("Warning: Unrecognized EC2 instance type for master-instance-type: {t}".format(
+ t=opts.master_instance_type), file=stderr)
# Since we try instance types even if we can't resolve them, we check if they resolve first
# and, if they do, see if they resolve to the same virtualization type.
if opts.instance_type in EC2_INSTANCE_TYPES and \
opts.master_instance_type in EC2_INSTANCE_TYPES:
if EC2_INSTANCE_TYPES[opts.instance_type] != \
EC2_INSTANCE_TYPES[opts.master_instance_type]:
- print >> stderr, \
- "Error: spark-ec2 currently does not support having a master and slaves " + \
- "with different AMI virtualization types."
- print >> stderr, "master instance virtualization type: {t}".format(
- t=EC2_INSTANCE_TYPES[opts.master_instance_type])
- print >> stderr, "slave instance virtualization type: {t}".format(
- t=EC2_INSTANCE_TYPES[opts.instance_type])
+ print("Error: spark-ec2 currently does not support having a master and slaves "
+ "with different AMI virtualization types.", file=stderr)
+ print("master instance virtualization type: {t}".format(
+ t=EC2_INSTANCE_TYPES[opts.master_instance_type]), file=stderr)
+ print("slave instance virtualization type: {t}".format(
+ t=EC2_INSTANCE_TYPES[opts.instance_type]), file=stderr)
sys.exit(1)
if opts.ebs_vol_num > 8:
- print >> stderr, "ebs-vol-num cannot be greater than 8"
+ print("ebs-vol-num cannot be greater than 8", file=stderr)
sys.exit(1)
# Prevent breaking ami_prefix (/, .git and startswith checks)
@@ -1206,23 +1209,22 @@ def real_main():
opts.spark_ec2_git_repo.endswith(".git") or \
not opts.spark_ec2_git_repo.startswith("https://github.com") or \
not opts.spark_ec2_git_repo.endswith("spark-ec2"):
- print >> stderr, "spark-ec2-git-repo must be a github repo and it must not have a " \
- "trailing / or .git. " \
- "Furthermore, we currently only support forks named spark-ec2."
+ print("spark-ec2-git-repo must be a github repo and it must not have a trailing / or .git. "
+ "Furthermore, we currently only support forks named spark-ec2.", file=stderr)
sys.exit(1)
if not (opts.deploy_root_dir is None or
(os.path.isabs(opts.deploy_root_dir) and
os.path.isdir(opts.deploy_root_dir) and
os.path.exists(opts.deploy_root_dir))):
- print >> stderr, "--deploy-root-dir must be an absolute path to a directory that exists " \
- "on the local file system"
+ print("--deploy-root-dir must be an absolute path to a directory that exists "
+ "on the local file system", file=stderr)
sys.exit(1)
try:
conn = ec2.connect_to_region(opts.region)
except Exception as e:
- print >> stderr, (e)
+ print((e), file=stderr)
sys.exit(1)
# Select an AZ at random if it was not specified.
@@ -1231,7 +1233,7 @@ def real_main():
if action == "launch":
if opts.slaves <= 0:
- print >> sys.stderr, "ERROR: You have to start at least 1 slave"
+ print("ERROR: You have to start at least 1 slave", file=sys.stderr)
sys.exit(1)
if opts.resume:
(master_nodes, slave_nodes) = get_existing_cluster(conn, opts, cluster_name)
@@ -1250,18 +1252,18 @@ def real_main():
conn, opts, cluster_name, die_on_error=False)
if any(master_nodes + slave_nodes):
- print "The following instances will be terminated:"
+ print("The following instances will be terminated:")
for inst in master_nodes + slave_nodes:
- print "> %s" % get_dns_name(inst, opts.private_ips)
- print "ALL DATA ON ALL NODES WILL BE LOST!!"
+ print("> %s" % get_dns_name(inst, opts.private_ips))
+ print("ALL DATA ON ALL NODES WILL BE LOST!!")
msg = "Are you sure you want to destroy the cluster {c}? (y/N) ".format(c=cluster_name)
response = raw_input(msg)
if response == "y":
- print "Terminating master..."
+ print("Terminating master...")
for inst in master_nodes:
inst.terminate()
- print "Terminating slaves..."
+ print("Terminating slaves...")
for inst in slave_nodes:
inst.terminate()
@@ -1274,16 +1276,16 @@ def real_main():
cluster_instances=(master_nodes + slave_nodes),
cluster_state='terminated'
)
- print "Deleting security groups (this will take some time)..."
+ print("Deleting security groups (this will take some time)...")
attempt = 1
while attempt <= 3:
- print "Attempt %d" % attempt
+ print("Attempt %d" % attempt)
groups = [g for g in conn.get_all_security_groups() if g.name in group_names]
success = True
# Delete individual rules in all groups before deleting groups to
# remove dependencies between them
for group in groups:
- print "Deleting rules in security group " + group.name
+ print("Deleting rules in security group " + group.name)
for rule in group.rules:
for grant in rule.grants:
success &= group.revoke(ip_protocol=rule.ip_protocol,
@@ -1298,10 +1300,10 @@ def real_main():
try:
# It is needed to use group_id to make it work with VPC
conn.delete_security_group(group_id=group.id)
- print "Deleted security group %s" % group.name
+ print("Deleted security group %s" % group.name)
except boto.exception.EC2ResponseError:
success = False
- print "Failed to delete security group %s" % group.name
+ print("Failed to delete security group %s" % group.name)
# Unfortunately, group.revoke() returns True even if a rule was not
# deleted, so this needs to be rerun if something fails
@@ -1311,17 +1313,16 @@ def real_main():
attempt += 1
if not success:
- print "Failed to delete all security groups after 3 tries."
- print "Try re-running in a few minutes."
+ print("Failed to delete all security groups after 3 tries.")
+ print("Try re-running in a few minutes.")
elif action == "login":
(master_nodes, slave_nodes) = get_existing_cluster(conn, opts, cluster_name)
if not master_nodes[0].public_dns_name and not opts.private_ips:
- print "Master has no public DNS name. Maybe you meant to specify " \
- "--private-ips?"
+ print("Master has no public DNS name. Maybe you meant to specify --private-ips?")
else:
master = get_dns_name(master_nodes[0], opts.private_ips)
- print "Logging into master " + master + "..."
+ print("Logging into master " + master + "...")
proxy_opt = []
if opts.proxy_port is not None:
proxy_opt = ['-D', opts.proxy_port]
@@ -1336,19 +1337,18 @@ def real_main():
if response == "y":
(master_nodes, slave_nodes) = get_existing_cluster(
conn, opts, cluster_name, die_on_error=False)
- print "Rebooting slaves..."
+ print("Rebooting slaves...")
for inst in slave_nodes:
if inst.state not in ["shutting-down", "terminated"]:
- print "Rebooting " + inst.id
+ print("Rebooting " + inst.id)
inst.reboot()
elif action == "get-master":
(master_nodes, slave_nodes) = get_existing_cluster(conn, opts, cluster_name)
if not master_nodes[0].public_dns_name and not opts.private_ips:
- print "Master has no public DNS name. Maybe you meant to specify " \
- "--private-ips?"
+ print("Master has no public DNS name. Maybe you meant to specify --private-ips?")
else:
- print get_dns_name(master_nodes[0], opts.private_ips)
+ print(get_dns_name(master_nodes[0], opts.private_ips))
elif action == "stop":
response = raw_input(
@@ -1361,11 +1361,11 @@ def real_main():
if response == "y":
(master_nodes, slave_nodes) = get_existing_cluster(
conn, opts, cluster_name, die_on_error=False)
- print "Stopping master..."
+ print("Stopping master...")
for inst in master_nodes:
if inst.state not in ["shutting-down", "terminated"]:
inst.stop()
- print "Stopping slaves..."
+ print("Stopping slaves...")
for inst in slave_nodes:
if inst.state not in ["shutting-down", "terminated"]:
if inst.spot_instance_request_id:
@@ -1375,11 +1375,11 @@ def real_main():
elif action == "start":
(master_nodes, slave_nodes) = get_existing_cluster(conn, opts, cluster_name)
- print "Starting slaves..."
+ print("Starting slaves...")
for inst in slave_nodes:
if inst.state not in ["shutting-down", "terminated"]:
inst.start()
- print "Starting master..."
+ print("Starting master...")
for inst in master_nodes:
if inst.state not in ["shutting-down", "terminated"]:
inst.start()
@@ -1403,15 +1403,15 @@ def real_main():
setup_cluster(conn, master_nodes, slave_nodes, opts, False)
else:
- print >> stderr, "Invalid action: %s" % action
+ print("Invalid action: %s" % action, file=stderr)
sys.exit(1)
def main():
try:
real_main()
- except UsageError, e:
- print >> stderr, "\nError:\n", e
+ except UsageError as e:
+ print("\nError:\n", e, file=stderr)
sys.exit(1)