diff options
author | Davies Liu <davies@databricks.com> | 2015-04-16 16:20:57 -0700 |
---|---|---|
committer | Josh Rosen <joshrosen@databricks.com> | 2015-04-16 16:20:57 -0700 |
commit | 04e44b37cc04f62fbf9e08c7076349e0a4d12ea8 (patch) | |
tree | b6429253955210445ddc37faa4d5166ea25a91e2 /ec2 | |
parent | 55f553a979db925aa0c3559f7e80b99d2bf3feb4 (diff) | |
download | spark-04e44b37cc04f62fbf9e08c7076349e0a4d12ea8.tar.gz spark-04e44b37cc04f62fbf9e08c7076349e0a4d12ea8.tar.bz2 spark-04e44b37cc04f62fbf9e08c7076349e0a4d12ea8.zip |
[SPARK-4897] [PySpark] Python 3 support
This PR update PySpark to support Python 3 (tested with 3.4).
Known issue: unpickle array from Pyrolite is broken in Python 3, those tests are skipped.
TODO: ec2/spark-ec2.py is not fully tested with python3.
Author: Davies Liu <davies@databricks.com>
Author: twneale <twneale@gmail.com>
Author: Josh Rosen <joshrosen@databricks.com>
Closes #5173 from davies/python3 and squashes the following commits:
d7d6323 [Davies Liu] fix tests
6c52a98 [Davies Liu] fix mllib test
99e334f [Davies Liu] update timeout
b716610 [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3
cafd5ec [Davies Liu] adddress comments from @mengxr
bf225d7 [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3
179fc8d [Davies Liu] tuning flaky tests
8c8b957 [Davies Liu] fix ResourceWarning in Python 3
5c57c95 [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3
4006829 [Davies Liu] fix test
2fc0066 [Davies Liu] add python3 path
71535e9 [Davies Liu] fix xrange and divide
5a55ab4 [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3
125f12c [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3
ed498c8 [Davies Liu] fix compatibility with python 3
820e649 [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3
e8ce8c9 [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3
ad7c374 [Davies Liu] fix mllib test and warning
ef1fc2f [Davies Liu] fix tests
4eee14a [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3
20112ff [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3
59bb492 [Davies Liu] fix tests
1da268c [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3
ca0fdd3 [Davies Liu] fix code style
9563a15 [Davies Liu] add imap back for python 2
0b1ec04 [Davies Liu] make python examples work with Python 3
d2fd566 [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3
a716d34 [Davies Liu] test with python 3.4
f1700e8 [Davies Liu] fix test in python3
671b1db [Davies Liu] fix test in python3
692ff47 [Davies Liu] fix flaky test
7b9699f [Davies Liu] invalidate import cache for Python 3.3+
9c58497 [Davies Liu] fix kill worker
309bfbf [Davies Liu] keep compatibility
5707476 [Davies Liu] cleanup, fix hash of string in 3.3+
8662d5b [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3
f53e1f0 [Davies Liu] fix tests
70b6b73 [Davies Liu] compile ec2/spark_ec2.py in python 3
a39167e [Davies Liu] support customize class in __main__
814c77b [Davies Liu] run unittests with python 3
7f4476e [Davies Liu] mllib tests passed
d737924 [Davies Liu] pass ml tests
375ea17 [Davies Liu] SQL tests pass
6cc42a9 [Davies Liu] rename
431a8de [Davies Liu] streaming tests pass
78901a7 [Davies Liu] fix hash of serializer in Python 3
24b2f2e [Davies Liu] pass all RDD tests
35f48fe [Davies Liu] run future again
1eebac2 [Davies Liu] fix conflict in ec2/spark_ec2.py
6e3c21d [Davies Liu] make cloudpickle work with Python3
2fb2db3 [Josh Rosen] Guard more changes behind sys.version; still doesn't run
1aa5e8f [twneale] Turned out `pickle.DictionaryType is dict` == True, so swapped it out
7354371 [twneale] buffer --> memoryview I'm not super sure if this a valid change, but the 2.7 docs recommend using memoryview over buffer where possible, so hoping it'll work.
b69ccdf [twneale] Uses the pure python pickle._Pickler instead of c-extension _pickle.Pickler. It appears pyspark 2.7 uses the pure python pickler as well, so this shouldn't degrade pickling performance (?).
f40d925 [twneale] xrange --> range
e104215 [twneale] Replaces 2.7 types.InstsanceType with 3.4 `object`....could be horribly wrong depending on how types.InstanceType is used elsewhere in the package--see http://bugs.python.org/issue8206
79de9d0 [twneale] Replaces python2.7 `file` with 3.4 _io.TextIOWrapper
2adb42d [Josh Rosen] Fix up some import differences between Python 2 and 3
854be27 [Josh Rosen] Run `futurize` on Python code:
7c5b4ce [Josh Rosen] Remove Python 3 check in shell.py.
Diffstat (limited to 'ec2')
-rwxr-xr-x | ec2/spark_ec2.py | 262 |
1 files changed, 131 insertions, 131 deletions
diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py index 0c1f24761d..87c0818279 100755 --- a/ec2/spark_ec2.py +++ b/ec2/spark_ec2.py @@ -19,7 +19,7 @@ # limitations under the License. # -from __future__ import with_statement +from __future__ import with_statement, print_function import hashlib import itertools @@ -37,12 +37,17 @@ import tarfile import tempfile import textwrap import time -import urllib2 import warnings from datetime import datetime from optparse import OptionParser from sys import stderr +if sys.version < "3": + from urllib2 import urlopen, Request, HTTPError +else: + from urllib.request import urlopen, Request + from urllib.error import HTTPError + SPARK_EC2_VERSION = "1.2.1" SPARK_EC2_DIR = os.path.dirname(os.path.realpath(__file__)) @@ -88,10 +93,10 @@ def setup_external_libs(libs): SPARK_EC2_LIB_DIR = os.path.join(SPARK_EC2_DIR, "lib") if not os.path.exists(SPARK_EC2_LIB_DIR): - print "Downloading external libraries that spark-ec2 needs from PyPI to {path}...".format( + print("Downloading external libraries that spark-ec2 needs from PyPI to {path}...".format( path=SPARK_EC2_LIB_DIR - ) - print "This should be a one-time operation." + )) + print("This should be a one-time operation.") os.mkdir(SPARK_EC2_LIB_DIR) for lib in libs: @@ -100,8 +105,8 @@ def setup_external_libs(libs): if not os.path.isdir(lib_dir): tgz_file_path = os.path.join(SPARK_EC2_LIB_DIR, versioned_lib_name + ".tar.gz") - print " - Downloading {lib}...".format(lib=lib["name"]) - download_stream = urllib2.urlopen( + print(" - Downloading {lib}...".format(lib=lib["name"])) + download_stream = urlopen( "{prefix}/{first_letter}/{lib_name}/{lib_name}-{lib_version}.tar.gz".format( prefix=PYPI_URL_PREFIX, first_letter=lib["name"][:1], @@ -113,13 +118,13 @@ def setup_external_libs(libs): tgz_file.write(download_stream.read()) with open(tgz_file_path) as tar: if hashlib.md5(tar.read()).hexdigest() != lib["md5"]: - print >> stderr, "ERROR: Got wrong md5sum for {lib}.".format(lib=lib["name"]) + print("ERROR: Got wrong md5sum for {lib}.".format(lib=lib["name"]), file=stderr) sys.exit(1) tar = tarfile.open(tgz_file_path) tar.extractall(path=SPARK_EC2_LIB_DIR) tar.close() os.remove(tgz_file_path) - print " - Finished downloading {lib}.".format(lib=lib["name"]) + print(" - Finished downloading {lib}.".format(lib=lib["name"])) sys.path.insert(1, lib_dir) @@ -299,12 +304,12 @@ def parse_args(): if home_dir is None or not os.path.isfile(home_dir + '/.boto'): if not os.path.isfile('/etc/boto.cfg'): if os.getenv('AWS_ACCESS_KEY_ID') is None: - print >> stderr, ("ERROR: The environment variable AWS_ACCESS_KEY_ID " + - "must be set") + print("ERROR: The environment variable AWS_ACCESS_KEY_ID must be set", + file=stderr) sys.exit(1) if os.getenv('AWS_SECRET_ACCESS_KEY') is None: - print >> stderr, ("ERROR: The environment variable AWS_SECRET_ACCESS_KEY " + - "must be set") + print("ERROR: The environment variable AWS_SECRET_ACCESS_KEY must be set", + file=stderr) sys.exit(1) return (opts, action, cluster_name) @@ -316,7 +321,7 @@ def get_or_make_group(conn, name, vpc_id): if len(group) > 0: return group[0] else: - print "Creating security group " + name + print("Creating security group " + name) return conn.create_security_group(name, "Spark EC2 group", vpc_id) @@ -324,18 +329,19 @@ def get_validate_spark_version(version, repo): if "." in version: version = version.replace("v", "") if version not in VALID_SPARK_VERSIONS: - print >> stderr, "Don't know about Spark version: {v}".format(v=version) + print("Don't know about Spark version: {v}".format(v=version), file=stderr) sys.exit(1) return version else: github_commit_url = "{repo}/commit/{commit_hash}".format(repo=repo, commit_hash=version) - request = urllib2.Request(github_commit_url) + request = Request(github_commit_url) request.get_method = lambda: 'HEAD' try: - response = urllib2.urlopen(request) - except urllib2.HTTPError, e: - print >> stderr, "Couldn't validate Spark commit: {url}".format(url=github_commit_url) - print >> stderr, "Received HTTP response code of {code}.".format(code=e.code) + response = urlopen(request) + except HTTPError as e: + print("Couldn't validate Spark commit: {url}".format(url=github_commit_url), + file=stderr) + print("Received HTTP response code of {code}.".format(code=e.code), file=stderr) sys.exit(1) return version @@ -394,8 +400,7 @@ def get_spark_ami(opts): instance_type = EC2_INSTANCE_TYPES[opts.instance_type] else: instance_type = "pvm" - print >> stderr,\ - "Don't recognize %s, assuming type is pvm" % opts.instance_type + print("Don't recognize %s, assuming type is pvm" % opts.instance_type, file=stderr) # URL prefix from which to fetch AMI information ami_prefix = "{r}/{b}/ami-list".format( @@ -404,10 +409,10 @@ def get_spark_ami(opts): ami_path = "%s/%s/%s" % (ami_prefix, opts.region, instance_type) try: - ami = urllib2.urlopen(ami_path).read().strip() - print "Spark AMI: " + ami + ami = urlopen(ami_path).read().strip() + print("Spark AMI: " + ami) except: - print >> stderr, "Could not resolve AMI at: " + ami_path + print("Could not resolve AMI at: " + ami_path, file=stderr) sys.exit(1) return ami @@ -419,11 +424,11 @@ def get_spark_ami(opts): # Fails if there already instances running in the cluster's groups. def launch_cluster(conn, opts, cluster_name): if opts.identity_file is None: - print >> stderr, "ERROR: Must provide an identity file (-i) for ssh connections." + print("ERROR: Must provide an identity file (-i) for ssh connections.", file=stderr) sys.exit(1) if opts.key_pair is None: - print >> stderr, "ERROR: Must provide a key pair name (-k) to use on instances." + print("ERROR: Must provide a key pair name (-k) to use on instances.", file=stderr) sys.exit(1) user_data_content = None @@ -431,7 +436,7 @@ def launch_cluster(conn, opts, cluster_name): with open(opts.user_data) as user_data_file: user_data_content = user_data_file.read() - print "Setting up security groups..." + print("Setting up security groups...") master_group = get_or_make_group(conn, cluster_name + "-master", opts.vpc_id) slave_group = get_or_make_group(conn, cluster_name + "-slaves", opts.vpc_id) authorized_address = opts.authorized_address @@ -497,8 +502,8 @@ def launch_cluster(conn, opts, cluster_name): existing_masters, existing_slaves = get_existing_cluster(conn, opts, cluster_name, die_on_error=False) if existing_slaves or (existing_masters and not opts.use_existing_master): - print >> stderr, ("ERROR: There are already instances running in " + - "group %s or %s" % (master_group.name, slave_group.name)) + print("ERROR: There are already instances running in group %s or %s" % + (master_group.name, slave_group.name), file=stderr) sys.exit(1) # Figure out Spark AMI @@ -511,12 +516,12 @@ def launch_cluster(conn, opts, cluster_name): additional_group_ids = [sg.id for sg in conn.get_all_security_groups() if opts.additional_security_group in (sg.name, sg.id)] - print "Launching instances..." + print("Launching instances...") try: image = conn.get_all_images(image_ids=[opts.ami])[0] except: - print >> stderr, "Could not find AMI " + opts.ami + print("Could not find AMI " + opts.ami, file=stderr) sys.exit(1) # Create block device mapping so that we can add EBS volumes if asked to. @@ -542,8 +547,8 @@ def launch_cluster(conn, opts, cluster_name): # Launch slaves if opts.spot_price is not None: # Launch spot instances with the requested price - print ("Requesting %d slaves as spot instances with price $%.3f" % - (opts.slaves, opts.spot_price)) + print("Requesting %d slaves as spot instances with price $%.3f" % + (opts.slaves, opts.spot_price)) zones = get_zones(conn, opts) num_zones = len(zones) i = 0 @@ -566,7 +571,7 @@ def launch_cluster(conn, opts, cluster_name): my_req_ids += [req.id for req in slave_reqs] i += 1 - print "Waiting for spot instances to be granted..." + print("Waiting for spot instances to be granted...") try: while True: time.sleep(10) @@ -579,24 +584,24 @@ def launch_cluster(conn, opts, cluster_name): if i in id_to_req and id_to_req[i].state == "active": active_instance_ids.append(id_to_req[i].instance_id) if len(active_instance_ids) == opts.slaves: - print "All %d slaves granted" % opts.slaves + print("All %d slaves granted" % opts.slaves) reservations = conn.get_all_reservations(active_instance_ids) slave_nodes = [] for r in reservations: slave_nodes += r.instances break else: - print "%d of %d slaves granted, waiting longer" % ( - len(active_instance_ids), opts.slaves) + print("%d of %d slaves granted, waiting longer" % ( + len(active_instance_ids), opts.slaves)) except: - print "Canceling spot instance requests" + print("Canceling spot instance requests") conn.cancel_spot_instance_requests(my_req_ids) # Log a warning if any of these requests actually launched instances: (master_nodes, slave_nodes) = get_existing_cluster( conn, opts, cluster_name, die_on_error=False) running = len(master_nodes) + len(slave_nodes) if running: - print >> stderr, ("WARNING: %d instances are still running" % running) + print(("WARNING: %d instances are still running" % running), file=stderr) sys.exit(0) else: # Launch non-spot instances @@ -618,16 +623,16 @@ def launch_cluster(conn, opts, cluster_name): placement_group=opts.placement_group, user_data=user_data_content) slave_nodes += slave_res.instances - print "Launched {s} slave{plural_s} in {z}, regid = {r}".format( - s=num_slaves_this_zone, - plural_s=('' if num_slaves_this_zone == 1 else 's'), - z=zone, - r=slave_res.id) + print("Launched {s} slave{plural_s} in {z}, regid = {r}".format( + s=num_slaves_this_zone, + plural_s=('' if num_slaves_this_zone == 1 else 's'), + z=zone, + r=slave_res.id)) i += 1 # Launch or resume masters if existing_masters: - print "Starting master..." + print("Starting master...") for inst in existing_masters: if inst.state not in ["shutting-down", "terminated"]: inst.start() @@ -650,10 +655,10 @@ def launch_cluster(conn, opts, cluster_name): user_data=user_data_content) master_nodes = master_res.instances - print "Launched master in %s, regid = %s" % (zone, master_res.id) + print("Launched master in %s, regid = %s" % (zone, master_res.id)) # This wait time corresponds to SPARK-4983 - print "Waiting for AWS to propagate instance metadata..." + print("Waiting for AWS to propagate instance metadata...") time.sleep(5) # Give the instances descriptive names for master in master_nodes: @@ -674,8 +679,8 @@ def get_existing_cluster(conn, opts, cluster_name, die_on_error=True): Get the EC2 instances in an existing cluster if available. Returns a tuple of lists of EC2 instance objects for the masters and slaves. """ - print "Searching for existing cluster {c} in region {r}...".format( - c=cluster_name, r=opts.region) + print("Searching for existing cluster {c} in region {r}...".format( + c=cluster_name, r=opts.region)) def get_instances(group_names): """ @@ -693,16 +698,15 @@ def get_existing_cluster(conn, opts, cluster_name, die_on_error=True): slave_instances = get_instances([cluster_name + "-slaves"]) if any((master_instances, slave_instances)): - print "Found {m} master{plural_m}, {s} slave{plural_s}.".format( - m=len(master_instances), - plural_m=('' if len(master_instances) == 1 else 's'), - s=len(slave_instances), - plural_s=('' if len(slave_instances) == 1 else 's')) + print("Found {m} master{plural_m}, {s} slave{plural_s}.".format( + m=len(master_instances), + plural_m=('' if len(master_instances) == 1 else 's'), + s=len(slave_instances), + plural_s=('' if len(slave_instances) == 1 else 's'))) if not master_instances and die_on_error: - print >> sys.stderr, \ - "ERROR: Could not find a master for cluster {c} in region {r}.".format( - c=cluster_name, r=opts.region) + print("ERROR: Could not find a master for cluster {c} in region {r}.".format( + c=cluster_name, r=opts.region), file=sys.stderr) sys.exit(1) return (master_instances, slave_instances) @@ -713,7 +717,7 @@ def get_existing_cluster(conn, opts, cluster_name, die_on_error=True): def setup_cluster(conn, master_nodes, slave_nodes, opts, deploy_ssh_key): master = get_dns_name(master_nodes[0], opts.private_ips) if deploy_ssh_key: - print "Generating cluster's SSH key on master..." + print("Generating cluster's SSH key on master...") key_setup = """ [ -f ~/.ssh/id_rsa ] || (ssh-keygen -q -t rsa -N '' -f ~/.ssh/id_rsa && @@ -721,10 +725,10 @@ def setup_cluster(conn, master_nodes, slave_nodes, opts, deploy_ssh_key): """ ssh(master, opts, key_setup) dot_ssh_tar = ssh_read(master, opts, ['tar', 'c', '.ssh']) - print "Transferring cluster's SSH key to slaves..." + print("Transferring cluster's SSH key to slaves...") for slave in slave_nodes: slave_address = get_dns_name(slave, opts.private_ips) - print slave_address + print(slave_address) ssh_write(slave_address, opts, ['tar', 'x'], dot_ssh_tar) modules = ['spark', 'ephemeral-hdfs', 'persistent-hdfs', @@ -738,8 +742,8 @@ def setup_cluster(conn, master_nodes, slave_nodes, opts, deploy_ssh_key): # NOTE: We should clone the repository before running deploy_files to # prevent ec2-variables.sh from being overwritten - print "Cloning spark-ec2 scripts from {r}/tree/{b} on master...".format( - r=opts.spark_ec2_git_repo, b=opts.spark_ec2_git_branch) + print("Cloning spark-ec2 scripts from {r}/tree/{b} on master...".format( + r=opts.spark_ec2_git_repo, b=opts.spark_ec2_git_branch)) ssh( host=master, opts=opts, @@ -749,7 +753,7 @@ def setup_cluster(conn, master_nodes, slave_nodes, opts, deploy_ssh_key): b=opts.spark_ec2_git_branch) ) - print "Deploying files to master..." + print("Deploying files to master...") deploy_files( conn=conn, root_dir=SPARK_EC2_DIR + "/" + "deploy.generic", @@ -760,25 +764,25 @@ def setup_cluster(conn, master_nodes, slave_nodes, opts, deploy_ssh_key): ) if opts.deploy_root_dir is not None: - print "Deploying {s} to master...".format(s=opts.deploy_root_dir) + print("Deploying {s} to master...".format(s=opts.deploy_root_dir)) deploy_user_files( root_dir=opts.deploy_root_dir, opts=opts, master_nodes=master_nodes ) - print "Running setup on master..." + print("Running setup on master...") setup_spark_cluster(master, opts) - print "Done!" + print("Done!") def setup_spark_cluster(master, opts): ssh(master, opts, "chmod u+x spark-ec2/setup.sh") ssh(master, opts, "spark-ec2/setup.sh") - print "Spark standalone cluster started at http://%s:8080" % master + print("Spark standalone cluster started at http://%s:8080" % master) if opts.ganglia: - print "Ganglia started at http://%s:5080/ganglia" % master + print("Ganglia started at http://%s:5080/ganglia" % master) def is_ssh_available(host, opts, print_ssh_output=True): @@ -795,7 +799,7 @@ def is_ssh_available(host, opts, print_ssh_output=True): if s.returncode != 0 and print_ssh_output: # extra leading newline is for spacing in wait_for_cluster_state() - print textwrap.dedent("""\n + print(textwrap.dedent("""\n Warning: SSH connection error. (This could be temporary.) Host: {h} SSH return code: {r} @@ -804,7 +808,7 @@ def is_ssh_available(host, opts, print_ssh_output=True): h=host, r=s.returncode, o=cmd_output.strip() - ) + )) return s.returncode == 0 @@ -865,10 +869,10 @@ def wait_for_cluster_state(conn, opts, cluster_instances, cluster_state): sys.stdout.write("\n") end_time = datetime.now() - print "Cluster is now in '{s}' state. Waited {t} seconds.".format( + print("Cluster is now in '{s}' state. Waited {t} seconds.".format( s=cluster_state, t=(end_time - start_time).seconds - ) + )) # Get number of local disks available for a given EC2 instance type. @@ -916,8 +920,8 @@ def get_num_disks(instance_type): if instance_type in disks_by_instance: return disks_by_instance[instance_type] else: - print >> stderr, ("WARNING: Don't know number of disks on instance type %s; assuming 1" - % instance_type) + print("WARNING: Don't know number of disks on instance type %s; assuming 1" + % instance_type, file=stderr) return 1 @@ -951,7 +955,7 @@ def deploy_files(conn, root_dir, opts, master_nodes, slave_nodes, modules): # Spark-only custom deploy spark_v = "%s|%s" % (opts.spark_git_repo, opts.spark_version) tachyon_v = "" - print "Deploying Spark via git hash; Tachyon won't be set up" + print("Deploying Spark via git hash; Tachyon won't be set up") modules = filter(lambda x: x != "tachyon", modules) master_addresses = [get_dns_name(i, opts.private_ips) for i in master_nodes] @@ -1067,8 +1071,8 @@ def ssh(host, opts, command): "--key-pair parameters and try again.".format(host)) else: raise e - print >> stderr, \ - "Error executing remote command, retrying after 30 seconds: {0}".format(e) + print("Error executing remote command, retrying after 30 seconds: {0}".format(e), + file=stderr) time.sleep(30) tries = tries + 1 @@ -1107,8 +1111,8 @@ def ssh_write(host, opts, command, arguments): elif tries > 5: raise RuntimeError("ssh_write failed with error %s" % proc.returncode) else: - print >> stderr, \ - "Error {0} while executing remote command, retrying after 30 seconds".format(status) + print("Error {0} while executing remote command, retrying after 30 seconds". + format(status), file=stderr) time.sleep(30) tries = tries + 1 @@ -1162,42 +1166,41 @@ def real_main(): if opts.identity_file is not None: if not os.path.exists(opts.identity_file): - print >> stderr,\ - "ERROR: The identity file '{f}' doesn't exist.".format(f=opts.identity_file) + print("ERROR: The identity file '{f}' doesn't exist.".format(f=opts.identity_file), + file=stderr) sys.exit(1) file_mode = os.stat(opts.identity_file).st_mode if not (file_mode & S_IRUSR) or not oct(file_mode)[-2:] == '00': - print >> stderr, "ERROR: The identity file must be accessible only by you." - print >> stderr, 'You can fix this with: chmod 400 "{f}"'.format(f=opts.identity_file) + print("ERROR: The identity file must be accessible only by you.", file=stderr) + print('You can fix this with: chmod 400 "{f}"'.format(f=opts.identity_file), + file=stderr) sys.exit(1) if opts.instance_type not in EC2_INSTANCE_TYPES: - print >> stderr, "Warning: Unrecognized EC2 instance type for instance-type: {t}".format( - t=opts.instance_type) + print("Warning: Unrecognized EC2 instance type for instance-type: {t}".format( + t=opts.instance_type), file=stderr) if opts.master_instance_type != "": if opts.master_instance_type not in EC2_INSTANCE_TYPES: - print >> stderr, \ - "Warning: Unrecognized EC2 instance type for master-instance-type: {t}".format( - t=opts.master_instance_type) + print("Warning: Unrecognized EC2 instance type for master-instance-type: {t}".format( + t=opts.master_instance_type), file=stderr) # Since we try instance types even if we can't resolve them, we check if they resolve first # and, if they do, see if they resolve to the same virtualization type. if opts.instance_type in EC2_INSTANCE_TYPES and \ opts.master_instance_type in EC2_INSTANCE_TYPES: if EC2_INSTANCE_TYPES[opts.instance_type] != \ EC2_INSTANCE_TYPES[opts.master_instance_type]: - print >> stderr, \ - "Error: spark-ec2 currently does not support having a master and slaves " + \ - "with different AMI virtualization types." - print >> stderr, "master instance virtualization type: {t}".format( - t=EC2_INSTANCE_TYPES[opts.master_instance_type]) - print >> stderr, "slave instance virtualization type: {t}".format( - t=EC2_INSTANCE_TYPES[opts.instance_type]) + print("Error: spark-ec2 currently does not support having a master and slaves " + "with different AMI virtualization types.", file=stderr) + print("master instance virtualization type: {t}".format( + t=EC2_INSTANCE_TYPES[opts.master_instance_type]), file=stderr) + print("slave instance virtualization type: {t}".format( + t=EC2_INSTANCE_TYPES[opts.instance_type]), file=stderr) sys.exit(1) if opts.ebs_vol_num > 8: - print >> stderr, "ebs-vol-num cannot be greater than 8" + print("ebs-vol-num cannot be greater than 8", file=stderr) sys.exit(1) # Prevent breaking ami_prefix (/, .git and startswith checks) @@ -1206,23 +1209,22 @@ def real_main(): opts.spark_ec2_git_repo.endswith(".git") or \ not opts.spark_ec2_git_repo.startswith("https://github.com") or \ not opts.spark_ec2_git_repo.endswith("spark-ec2"): - print >> stderr, "spark-ec2-git-repo must be a github repo and it must not have a " \ - "trailing / or .git. " \ - "Furthermore, we currently only support forks named spark-ec2." + print("spark-ec2-git-repo must be a github repo and it must not have a trailing / or .git. " + "Furthermore, we currently only support forks named spark-ec2.", file=stderr) sys.exit(1) if not (opts.deploy_root_dir is None or (os.path.isabs(opts.deploy_root_dir) and os.path.isdir(opts.deploy_root_dir) and os.path.exists(opts.deploy_root_dir))): - print >> stderr, "--deploy-root-dir must be an absolute path to a directory that exists " \ - "on the local file system" + print("--deploy-root-dir must be an absolute path to a directory that exists " + "on the local file system", file=stderr) sys.exit(1) try: conn = ec2.connect_to_region(opts.region) except Exception as e: - print >> stderr, (e) + print((e), file=stderr) sys.exit(1) # Select an AZ at random if it was not specified. @@ -1231,7 +1233,7 @@ def real_main(): if action == "launch": if opts.slaves <= 0: - print >> sys.stderr, "ERROR: You have to start at least 1 slave" + print("ERROR: You have to start at least 1 slave", file=sys.stderr) sys.exit(1) if opts.resume: (master_nodes, slave_nodes) = get_existing_cluster(conn, opts, cluster_name) @@ -1250,18 +1252,18 @@ def real_main(): conn, opts, cluster_name, die_on_error=False) if any(master_nodes + slave_nodes): - print "The following instances will be terminated:" + print("The following instances will be terminated:") for inst in master_nodes + slave_nodes: - print "> %s" % get_dns_name(inst, opts.private_ips) - print "ALL DATA ON ALL NODES WILL BE LOST!!" + print("> %s" % get_dns_name(inst, opts.private_ips)) + print("ALL DATA ON ALL NODES WILL BE LOST!!") msg = "Are you sure you want to destroy the cluster {c}? (y/N) ".format(c=cluster_name) response = raw_input(msg) if response == "y": - print "Terminating master..." + print("Terminating master...") for inst in master_nodes: inst.terminate() - print "Terminating slaves..." + print("Terminating slaves...") for inst in slave_nodes: inst.terminate() @@ -1274,16 +1276,16 @@ def real_main(): cluster_instances=(master_nodes + slave_nodes), cluster_state='terminated' ) - print "Deleting security groups (this will take some time)..." + print("Deleting security groups (this will take some time)...") attempt = 1 while attempt <= 3: - print "Attempt %d" % attempt + print("Attempt %d" % attempt) groups = [g for g in conn.get_all_security_groups() if g.name in group_names] success = True # Delete individual rules in all groups before deleting groups to # remove dependencies between them for group in groups: - print "Deleting rules in security group " + group.name + print("Deleting rules in security group " + group.name) for rule in group.rules: for grant in rule.grants: success &= group.revoke(ip_protocol=rule.ip_protocol, @@ -1298,10 +1300,10 @@ def real_main(): try: # It is needed to use group_id to make it work with VPC conn.delete_security_group(group_id=group.id) - print "Deleted security group %s" % group.name + print("Deleted security group %s" % group.name) except boto.exception.EC2ResponseError: success = False - print "Failed to delete security group %s" % group.name + print("Failed to delete security group %s" % group.name) # Unfortunately, group.revoke() returns True even if a rule was not # deleted, so this needs to be rerun if something fails @@ -1311,17 +1313,16 @@ def real_main(): attempt += 1 if not success: - print "Failed to delete all security groups after 3 tries." - print "Try re-running in a few minutes." + print("Failed to delete all security groups after 3 tries.") + print("Try re-running in a few minutes.") elif action == "login": (master_nodes, slave_nodes) = get_existing_cluster(conn, opts, cluster_name) if not master_nodes[0].public_dns_name and not opts.private_ips: - print "Master has no public DNS name. Maybe you meant to specify " \ - "--private-ips?" + print("Master has no public DNS name. Maybe you meant to specify --private-ips?") else: master = get_dns_name(master_nodes[0], opts.private_ips) - print "Logging into master " + master + "..." + print("Logging into master " + master + "...") proxy_opt = [] if opts.proxy_port is not None: proxy_opt = ['-D', opts.proxy_port] @@ -1336,19 +1337,18 @@ def real_main(): if response == "y": (master_nodes, slave_nodes) = get_existing_cluster( conn, opts, cluster_name, die_on_error=False) - print "Rebooting slaves..." + print("Rebooting slaves...") for inst in slave_nodes: if inst.state not in ["shutting-down", "terminated"]: - print "Rebooting " + inst.id + print("Rebooting " + inst.id) inst.reboot() elif action == "get-master": (master_nodes, slave_nodes) = get_existing_cluster(conn, opts, cluster_name) if not master_nodes[0].public_dns_name and not opts.private_ips: - print "Master has no public DNS name. Maybe you meant to specify " \ - "--private-ips?" + print("Master has no public DNS name. Maybe you meant to specify --private-ips?") else: - print get_dns_name(master_nodes[0], opts.private_ips) + print(get_dns_name(master_nodes[0], opts.private_ips)) elif action == "stop": response = raw_input( @@ -1361,11 +1361,11 @@ def real_main(): if response == "y": (master_nodes, slave_nodes) = get_existing_cluster( conn, opts, cluster_name, die_on_error=False) - print "Stopping master..." + print("Stopping master...") for inst in master_nodes: if inst.state not in ["shutting-down", "terminated"]: inst.stop() - print "Stopping slaves..." + print("Stopping slaves...") for inst in slave_nodes: if inst.state not in ["shutting-down", "terminated"]: if inst.spot_instance_request_id: @@ -1375,11 +1375,11 @@ def real_main(): elif action == "start": (master_nodes, slave_nodes) = get_existing_cluster(conn, opts, cluster_name) - print "Starting slaves..." + print("Starting slaves...") for inst in slave_nodes: if inst.state not in ["shutting-down", "terminated"]: inst.start() - print "Starting master..." + print("Starting master...") for inst in master_nodes: if inst.state not in ["shutting-down", "terminated"]: inst.start() @@ -1403,15 +1403,15 @@ def real_main(): setup_cluster(conn, master_nodes, slave_nodes, opts, False) else: - print >> stderr, "Invalid action: %s" % action + print("Invalid action: %s" % action, file=stderr) sys.exit(1) def main(): try: real_main() - except UsageError, e: - print >> stderr, "\nError:\n", e + except UsageError as e: + print("\nError:\n", e, file=stderr) sys.exit(1) |