#!/usr/bin/env python # -*- coding: utf-8 -*- # # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # from __future__ import division, print_function, with_statement import codecs import hashlib import itertools import logging import os import os.path import pipes import random import shutil import string from stat import S_IRUSR import subprocess import sys import tarfile import tempfile import textwrap import time import warnings from datetime import datetime from optparse import OptionParser from sys import stderr if sys.version < "3": from urllib2 import urlopen, Request, HTTPError else: from urllib.request import urlopen, Request from urllib.error import HTTPError raw_input = input xrange = range SPARK_EC2_VERSION = "1.4.0" SPARK_EC2_DIR = os.path.dirname(os.path.realpath(__file__)) VALID_SPARK_VERSIONS = set([ "0.7.3", "0.8.0", "0.8.1", "0.9.0", "0.9.1", "0.9.2", "1.0.0", "1.0.1", "1.0.2", "1.1.0", "1.1.1", "1.2.0", "1.2.1", "1.3.0", "1.3.1", "1.4.0", ]) SPARK_TACHYON_MAP = { "1.0.0": "0.4.1", "1.0.1": "0.4.1", "1.0.2": "0.4.1", "1.1.0": "0.5.0", "1.1.1": "0.5.0", "1.2.0": "0.5.0", "1.2.1": "0.5.0", "1.3.0": "0.5.0", "1.3.1": "0.5.0", "1.4.0": "0.6.4", } DEFAULT_SPARK_VERSION = SPARK_EC2_VERSION DEFAULT_SPARK_GITHUB_REPO = "https://github.com/apache/spark" # Default location to get the spark-ec2 scripts (and ami-list) from DEFAULT_SPARK_EC2_GITHUB_REPO = "https://github.com/mesos/spark-ec2" DEFAULT_SPARK_EC2_BRANCH = "branch-1.4" def setup_external_libs(libs): """ Download external libraries from PyPI to SPARK_EC2_DIR/lib/ and prepend them to our PATH. """ PYPI_URL_PREFIX = "https://pypi.python.org/packages/source" SPARK_EC2_LIB_DIR = os.path.join(SPARK_EC2_DIR, "lib") if not os.path.exists(SPARK_EC2_LIB_DIR): print("Downloading external libraries that spark-ec2 needs from PyPI to {path}...".format( path=SPARK_EC2_LIB_DIR )) print("This should be a one-time operation.") os.mkdir(SPARK_EC2_LIB_DIR) for lib in libs: versioned_lib_name = "{n}-{v}".format(n=lib["name"], v=lib["version"]) lib_dir = os.path.join(SPARK_EC2_LIB_DIR, versioned_lib_name) if not os.path.isdir(lib_dir): tgz_file_path = os.path.join(SPARK_EC2_LIB_DIR, versioned_lib_name + ".tar.gz") print(" - Downloading {lib}...".format(lib=lib["name"])) download_stream = urlopen( "{prefix}/{first_letter}/{lib_name}/{lib_name}-{lib_version}.tar.gz".format( prefix=PYPI_URL_PREFIX, first_letter=lib["name"][:1], lib_name=lib["name"], lib_version=lib["version"] ) ) with open(tgz_file_path, "wb") as tgz_file: tgz_file.write(download_stream.read()) with open(tgz_file_path, "rb") as tar: if hashlib.md5(tar.read()).hexdigest() != lib["md5"]: print("ERROR: Got wrong md5sum for {lib}.".format(lib=lib["name"]), file=stderr) sys.exit(1) tar = tarfile.open(tgz_file_path) tar.extractall(path=SPARK_EC2_LIB_DIR) tar.close() os.remove(tgz_file_path) print(" - Finished downloading {lib}.".format(lib=lib["name"])) sys.path.insert(1, lib_dir) # Only PyPI libraries are supported. external_libs = [ { "name": "boto", "version": "2.34.0", "md5": "5556223d2d0cc4d06dd4829e671dcecd" } ] setup_external_libs(external_libs) import boto from boto.ec2.blockdevicemapping import BlockDeviceMapping, BlockDeviceType, EBSBlockDeviceType from boto import ec2 class UsageError(Exception): pass # Configure and parse our command-line arguments def parse_args(): parser = OptionParser( prog="spark-ec2", version="%prog {v}".format(v=SPARK_EC2_VERSION), usage="%prog [options] \n\n" + " can be: launch, destroy, login, stop, start, get-master, reboot-slaves") parser.add_option( "-s", "--slaves", type="int", default=1, help="Number of slaves to launch (default: %default)") parser.add_option( "-w", "--wait", type="int", help="DEPRECATED (no longer necessary) - Seconds to wait for nodes to start") parser.add_option( "-k", "--key-pair", help="Key pair to use on instances") parser.add_option( "-i", "--identity-file", help="SSH private key file to use for logging into instances") parser.add_option( "-t", "--instance-type", default="m1.large", help="Type of instance to launch (default: %default). " + "WARNING: must be 64-bit; small instances won't work") parser.add_option( "-m", "--master-instance-type", default="", help="Master instance type (leave empty for same as instance-type)") parser.add_option( "-r", "--region", default="us-east-1", help="EC2 region used to launch instances in, or to find them in (default: %default)") parser.add_option( "-z", "--zone", default="", help="Availability zone to launch instances in, or 'all' to spread " + "slaves across multiple (an additional $0.01/Gb for bandwidth" + "between zones applies) (default: a single zone chosen at random)") parser.add_option( "-a", "--ami", help="Amazon Machine Image ID to use") parser.add_option( "-v", "--spark-version", default=DEFAULT_SPARK_VERSION, help="Version of Spark to use: 'X.Y.Z' or a specific git hash (default: %default)") parser.add_option( "--spark-git-repo", default=DEFAULT_SPARK_GITHUB_REPO, help="Github repo from which to checkout supplied commit hash (default: %default)") parser.add_option( "--spark-ec2-git-repo", default=DEFAULT_SPARK_EC2_GITHUB_REPO, help="Github repo from which to checkout spark-ec2 (default: %default)") parser.add_option( "--spark-ec2-git-branch", default=DEFAULT_SPARK_EC2_BRANCH, help="Github repo branch of spark-ec2 to use (default: %default)") parser.add_option( "--deploy-root-dir", default=None, help="A directory to copy into / on the first master. " + "Must be absolute. Note that a trailing slash is handled as per rsync: " + "If you omit it, the last directory of the --deploy-root-dir path will be created " + "in / before copying its contents. If you append the trailing slash, " + "the directory is not created and its contents are copied directly into /. " + "(default: %default).") parser.add_option( "--hadoop-major-version", default="1", help="Major version of Hadoop. Valid options are 1 (Hadoop 1.0.4), 2 (CDH 4.2.0), yarn " + "(Hadoop 2.4.0) (default: %default)") parser.add_option( "-D", metavar="[ADDRESS:]PORT", dest="proxy_port", help="Use SSH dynamic port forwarding to create a SOCKS proxy at " + "the given local address (for use with login)") parser.add_option( "--resume", action="store_true", default=False, help="Resume installation on a previously launched cluster " + "(for debugging)") parser.add_option( "--ebs-vol-size", metavar="SIZE", type="int", default=0, help="Size (in GB) of each EBS volume.") parser.add_option( "--ebs-vol-type", default="standard", help="EBS volume type (e.g. 'gp2', 'standard').") parser.add_option( "--ebs-vol-num", type="int", default=1, help="Number of EBS volumes to attach to each node as /vol[x]. " + "The volumes will be deleted when the instances terminate. " + "Only possible on EBS-backed AMIs. " + "EBS volumes are only attached if --ebs-vol-size > 0." + "Only support up to 8 EBS volumes.") parser.add_option( "--placement-group", type="string", default=None, help="Which placement group to try and launch " + "instances into. Assumes placement group is already " + "created.") parser.add_option( "--swap", metavar="SWAP", type="int", default=1024, help="Swap space to set up per node, in MB (default: %default)") parser.add_option( "--spot-price", metavar="PRICE", type="float", help="If specified, launch slaves as spot instances with the given " + "maximum price (in dollars)") parser.add_option( "--ganglia", action="store_true", default=True, help="Setup Ganglia monitoring on cluster (default: %default). NOTE: " + "the Ganglia page will be publicly accessible") parser.add_option( "--no-ganglia", action="store_false", dest="ganglia", help="Disable Ganglia monitoring for the cluster") parser.add_option( "-u", "--user", default="root", help="The SSH user you want to connect as (default: %default)") parser.add_option( "--delete-groups", action="store_true", default=False, help="When destroying a cluster, delete the security groups that were created") parser.add_option( "--use-existing-master", action="store_true", default=False, help="Launch fresh slaves, but use an existing stopped master if possible") parser.add_option( "--worker-instances", type="int", default=1, help="Number of instances per worker: variable SPARK_WORKER_INSTANCES. Not used if YARN " + "is used as Hadoop major version (default: %default)") parser.add_option( "--master-opts", type="string", default="", help="Extra options to give to master through SPARK_MASTER_OPTS variable " + "(e.g -Dspark.worker.timeout=180)") parser.add_option( "--user-data", type="string", default="", help="Path to a user-data file (most AMIs interpret this as an initialization script)") parser.add_option( "--authorized-address", type="string", default="0.0.0.0/0", help="Address to authorize on created security groups (default: %default)") parser.add_option( "--additional-security-group", type="string", default="", help="Additional security group to place the machines in") parser.add_option( "--additional-tags", type="string", default="", help="Additional tags to set on the machines; tags are comma-separated, while name and " + "value are colon separated; ex: \"Task:MySparkProject,Env:production\"") parser.add_option( "--copy-aws-credentials", action="store_true", default=False, help="Add AWS credentials to hadoop configuration to allow Spark to access S3") parser.add_option( "--subnet-id", default=None, help="VPC subnet to launch instances in") parser.add_option( "--vpc-id", default=None, help="VPC to launch instances in") parser.add_option( "--private-ips", action="store_true", default=False, help="Use private IPs for instances rather than public if VPC/subnet " + "requires that.") parser.add_option( "--instance-initiated-shutdown-behavior", default="stop", choices=["stop", "terminate"], help="Whether instances should terminate when shut down or just stop") parser.add_option( "--instance-profile-name", default=None, help="IAM profile name to launch instances under") (opts, args) = parser.parse_args() if len(args) != 2: parser.print_help() sys.exit(1) (action, cluster_name) = args # Boto config check # http://boto.cloudhackers.com/en/latest/boto_config_tut.html home_dir = os.getenv('HOME') if home_dir is None or not os.path.isfile(home_dir + '/.boto'): if not os.path.isfile('/etc/boto.cfg'): # If there is no boto config, check aws credentials if not os.path.isfile(home_dir + '/.aws/credentials'): if os.getenv('AWS_ACCESS_KEY_ID') is None: print("ERROR: The environment variable AWS_ACCESS_KEY_ID must be set", file=stderr) sys.exit(1) if os.getenv('AWS_SECRET_ACCESS_KEY') is None: print("ERROR: The environment variable AWS_SECRET_ACCESS_KEY must be set", file=stderr) sys.exit(1) return (opts, action, cluster_name) # Get the EC2 security group of the given name, creating it if it doesn't exist def get_or_make_group(conn, name, vpc_id): groups = conn.get_all_security_groups() group = [g for g in groups if g.name == name] if len(group) > 0: return group[0] else: print("Creating security group " + name) return conn.create_security_group(name, "Spark EC2 group", vpc_id) def get_validate_spark_version(version, repo): if "." in version: version = version.replace("v", "") if version not in VALID_SPARK_VERSIONS: print("Don't know about Spark version: {v}".format(v=version), file=stderr) sys.exit(1) return version else: github_commit_url = "{repo}/commit/{commit_hash}".format(repo=repo, commit_hash=version) request = Request(github_commit_url) request.get_method = lambda: 'HEAD' try: response = urlopen(request) except HTTPError as e: print("Couldn't validate Spark commit: {url}".format(url=github_commit_url), file=stderr) print("Received HTTP response code of {code}.".format(code=e.code), file=stderr) sys.exit(1) return version # Source: http://aws.amazon.com/amazon-linux-ami/instance-type-matrix/ # Last Updated: 2015-06-19 # For easy maintainability, please keep this manually-inputted dictionary sorted by key. EC2_INSTANCE_TYPES = { "c1.medium": "pvm", "c1.xlarge": "pvm", "c3.large": "pvm", "c3.xlarge": "pvm", "c3.2xlarge": "pvm", "c3.4xlarge": "pvm", "c3.8xlarge": "pvm", "c4.large": "hvm", "c4.xlarge": "hvm", "c4.2xlarge": "hvm", "c4.4xlarge": "hvm", "c4.8xlarge": "hvm", "cc1.4xlarge": "hvm", "cc2.8xlarge": "hvm", "cg1.4xlarge": "hvm", "cr1.8xlarge": "hvm", "d2.xlarge": "hvm", "d2.2xlarge": "hvm", "d2.4xlarge": "hvm", "d2.8xlarge": "hvm", "g2.2xlarge": "hvm", "g2.8xlarge": "hvm", "hi1.4xlarge": "pvm", "hs1.8xlarge": "pvm", "i2.xlarge": "hvm", "i2.2xlarge": "hvm", "i2.4xlarge": "hvm", "i2.8xlarge": "hvm", "m1.small": "pvm", "m1.medium": "pvm", "m1.large": "pvm", "m1.xlarge": "pvm", "m2.xlarge": "pvm", "m2.2xlarge": "pvm", "m2.4xlarge": "pvm", "m3.medium": "hvm", "m3.large": "hvm", "m3.xlarge": "hvm", "m3.2xlarge": "hvm", "m4.large": "hvm", "m4.xlarge": "hvm", "m4.2xlarge": "hvm", "m4.4xlarge": "hvm", "m4.10xlarge": "hvm", "r3.large": "hvm", "r3.xlarge": "hvm", "r3.2xlarge": "hvm", "r3.4xlarge": "hvm", "r3.8xlarge": "hvm", "t1.micro": "pvm", "t2.micro": "hvm", "t2.small": "hvm", "t2.medium": "hvm", "t2.large": "hvm", } def get_tachyon_version(spark_version): return SPARK_TACHYON_MAP.get(spark_version, "") # Attempt to resolve an appropriate AMI given the architecture and region of the request. def get_spark_ami(opts): if opts.instance_type in EC2_INSTANCE_TYPES: instance_type = EC2_INSTANCE_TYPES[opts.instance_type] else: instance_type = "pvm" print("Don't recognize %s, assuming type is pvm" % opts.instance_type, file=stderr) # URL prefix from which to fetch AMI information ami_prefix = "{r}/{b}/ami-list".format( r=opts.spark_ec2_git_repo.replace("https://github.com", "https://raw.github.com", 1), b=opts.spark_ec2_git_branch) ami_path = "%s/%s/%s" % (ami_prefix, opts.region, instance_type) reader = codecs.getreader("ascii") try: ami = reader(urlopen(ami_path)).read().strip() except: print("Could not resolve AMI at: " + ami_path, file=stderr) sys.exit(1) print("Spark AMI: " + ami) return ami # Launch a cluster of the given name, by setting up its security groups, # and then starting new instances in them. # Returns a tuple of EC2 reservation objects for the master and slaves # Fails if there already instances running in the cluster's groups. def launch_cluster(conn, opts, cluster_name): if opts.identity_file is None: print("ERROR: Must provide an identity file (-i) for ssh connections.", file=stderr) sys.exit(1) if opts.key_pair is None: print("ERROR: Must provide a key pair name (-k) to use on instances.", file=stderr) sys.exit(1) user_data_content = None if opts.user_data: with open(opts.user_data) as user_data_file: user_data_content = user_data_file.read() print("Setting up security groups...") master_group = get_or_make_group(conn, cluster_name + "-master", opts.vpc_id) slave_group = get_or_make_group(conn, cluster_name + "-slaves", opts.vpc_id) authorized_address = opts.authorized_address if master_group.rules == []: # Group was just now created if opts.vpc_id is None: master_group.authorize(src_group=master_group) master_group.authorize(src_group=slave_group) else: master_group.authorize(ip_protocol='icmp', from_port=-1, to_port=-1, src_group=master_group) master_group.authorize(ip_protocol='tcp', from_port=0, to_port=65535, src_group=master_group) master_group.authorize(ip_protocol='udp', from_port=0, to_port=65535, src_group=master_group) master_group.authorize(ip_protocol='icmp', from_port=-1, to_port=-1, src_group=slave_group) master_group.authorize(ip_protocol='tcp', from_port=0, to_port=65535, src_group=slave_group) master_group.authorize(ip_protocol='udp', from_port=0, to_port=65535, src_group=slave_group) master_group.authorize('tcp', 22, 22, authorized_address) master_group.authorize('tcp', 8080, 8081, authorized_address) master_group.authorize('tcp', 18080, 18080, authorized_address) master_group.authorize('tcp', 19999, 19999, authorized_address) master_group.authorize('tcp', 50030, 50030, authorized_address) master_group.authorize('tcp', 50070, 50070, authorized_address) master_group.authorize('tcp', 60070, 60070, authorized_address) master_group.authorize('tcp', 4040, 4045, authorized_address) # Rstudio (GUI for R) needs port 8787 for web access master_group.authorize('tcp', 8787, 8787, authorized_address) # HDFS NFS gateway requires 111,2049,4242 for tcp & udp master_group.authorize('tcp', 111, 111, authorized_address) master_group.authorize('udp', 111, 111, authorized_address) master_group.authorize('tcp', 2049, 2049, authorized_address) master_group.authorize('udp', 2049, 2049, authorized_address) master_group.authorize('tcp', 4242, 4242, authorized_address) master_group.authorize('udp', 4242, 4242, authorized_address) # RM in YARN mode uses 8088 master_group.authorize('tcp', 8088, 8088, authorized_address) if opts.ganglia: master_group.authorize('tcp', 5080, 5080, authorized_address) if slave_group.rules == []: # Group was just now created if opts.vpc_id is None: slave_group.authorize(src_group=master_group) slave_group.authorize(src_group=slave_group) else: slave_group.authorize(ip_protocol='icmp', from_port=-1, to_port=-1, src_group=master_group) slave_group.authorize(ip_protocol='tcp', from_port=0, to_port=65535, src_group=master_group) slave_group.authorize(ip_protocol='udp', from_port=0, to_port=65535, src_group=master_group) slave_group.authorize(ip_protocol='icmp', from_port=-1, to_port=-1, src_group=slave_group) slave_group.authorize(ip_protocol='tcp', from_port=0, to_port=65535, src_group=slave_group) slave_group.authorize(ip_protocol='udp', from_port=0, to_port=65535, src_group=slave_group) slave_group.authorize('tcp', 22, 22, authorized_address) slave_group.authorize('tcp', 8080, 8081, authorized_address) slave_group.authorize('tcp', 50060, 50060, authorized_address) slave_group.authorize('tcp', 50075, 50075, authorized_address) slave_group.authorize('tcp', 60060, 60060, authorized_address) slave_group.authorize('tcp', 60075, 60075, authorized_address) # Check if instances are already running in our groups existing_masters, existing_slaves = get_existing_cluster(conn, opts, cluster_name, die_on_error=False) if existing_slaves or (existing_masters and not opts.use_existing_master): print("ERROR: There are already instances running in group %s or %s" % (master_group.name, slave_group.name), file=stderr) sys.exit(1) # Figure out Spark AMI if opts.ami is None: opts.ami = get_spark_ami(opts) # we use group ids to work around https://github.com/boto/boto/issues/350 additional_group_ids = [] if opts.additional_security_group: additional_group_ids = [sg.id for sg in conn.get_all_security_groups() if opts.additional_security_group in (sg.name, sg.id)] print("Launching instances...") try: image = conn.get_all_images(image_ids=[opts.ami])[0] except: print("Could not find AMI " + opts.ami, file=stderr) sys.exit(1) # Create block device mapping so that we can add EBS volumes if asked to. # The first drive is attached as /dev/sds, 2nd as /dev/sdt, ... /dev/sdz block_map = BlockDeviceMapping() if opts.ebs_vol_size > 0: for i in range(opts.ebs_vol_num): device = EBSBlockDeviceType() device.size = opts.ebs_vol_size device.volume_type = opts.ebs_vol_type device.delete_on_termination = True block_map["/dev/sd" + chr(ord('s') + i)] = device # AWS ignores the AMI-specified block device mapping for M3 (see SPARK-3342). if opts.instance_type.startswith('m3.'): for i in range(get_num_disks(opts.instance_type)): dev = BlockDeviceType() dev.ephemeral_name = 'ephemeral%d' % i # The first ephemeral drive is /dev/sdb. name = '/dev/sd' + string.letters[i + 1] block_map[name] = dev # Launch slaves if opts.spot_price is not None: # Launch spot instances with the requested price print("Requesting %d slaves as spot instances with price $%.3f" % (opts.slaves, opts.spot_price)) zones = get_zones(conn, opts) num_zones = len(zones) i = 0 my_req_ids = [] for zone in zones: num_slaves_this_zone = get_partition(opts.slaves, num_zones, i) slave_reqs = conn.request_spot_instances( price=opts.spot_price, image_id=opts.ami, launch_group="launch-group-%s" % cluster_name, placement=zone, count=num_slaves_this_zone, key_name=opts.key_pair, security_group_ids=[slave_group.id] + additional_group_ids, instance_type=opts.instance_type, block_device_map=block_map, subnet_id=opts.subnet_id, placement_group=opts.placement_group, user_data=user_data_content, instance_profile_name=opts.instance_profile_name) my_req_ids += [req.id for req in slave_reqs] i += 1 print("Waiting for spot instances to be granted...") try: while True: time.sleep(10) reqs = conn.get_all_spot_instance_requests() id_to_req = {} for r in reqs: id_to_req[r.id] = r active_instance_ids = [] for i in my_req_ids: if i in id_to_req and id_to_req[i].state == "active": active_instance_ids.append(id_to_req[i].instance_id) if len(active_instance_ids) == opts.slaves: print("All %d slaves granted" % opts.slaves) reservations = conn.get_all_reservations(active_instance_ids) slave_nodes = [] for r in reservations: slave_nodes += r.instances break else: print("%d of %d slaves granted, waiting longer" % ( len(active_instance_ids), opts.slaves)) except: print("Canceling spot instance requests") conn.cancel_spot_instance_requests(my_req_ids) # Log a warning if any of these requests actually launched instances: (master_nodes, slave_nodes) = get_existing_cluster( conn, opts, cluster_name, die_on_error=False) running = len(master_nodes) + len(slave_nodes) if running: print(("WARNING: %d instances are still running" % running), file=stderr) sys.exit(0) else: # Launch non-spot instances zones = get_zones(conn, opts) num_zones = len(zones) i = 0 slave_nodes = [] for zone in zones: num_slaves_this_zone = get_partition(opts.slaves, num_zones, i) if num_slaves_this_zone > 0: slave_res = image.run( key_name=opts.key_pair, security_group_ids=[slave_group.id] + additional_group_ids, instance_type=opts.instance_type, placement=zone, min_count=num_slaves_this_zone, max_count=num_slaves_this_zone, block_device_map=block_map, subnet_id=opts.subnet_id, placement_group=opts.placement_group, user_data=user_data_content, instance_initiated_shutdown_behavior=opts.instance_initiated_shutdown_behavior, instance_profile_name=opts.instance_profile_name) slave_nodes += slave_res.instances print("Launched {s} slave{plural_s} in {z}, regid = {r}".format( s=num_slaves_this_zone, plural_s=('' if num_slaves_this_zone == 1 else 's'), z=zone, r=slave_res.id)) i += 1 # Launch or resume masters if existing_masters: print("Starting master...") for inst in existing_masters: if inst.state not in ["shutting-down", "terminated"]: inst.start() master_nodes = existing_masters else: master_type = opts.master_instance_type if master_type == "": master_type = opts.instance_type if opts.zone == 'all': opts.zone = random.choice(conn.get_all_zones()).name master_res = image.run( key_name=opts.key_pair, security_group_ids=[master_group.id] + additional_group_ids, instance_type=master_type, placement=opts.zone, min_count=1, max_count=1, block_device_map=block_map, subnet_id=opts.subnet_id, placement_group=opts.placement_group, user_data=user_data_content, instance_initiated_shutdown_behavior=opts.instance_initiated_shutdown_behavior, instance_profile_name=opts.instance_profile_name) master_nodes = master_res.instances print("Launched master in %s, regid = %s" % (zone, master_res.id)) # This wait time corresponds to SPARK-4983 print("Waiting for AWS to propagate instance metadata...") time.sleep(15) # Give the instances descriptive names and set additional tags additional_tags = {} if opts.additional_tags.strip(): additional_tags = dict( map(str.strip, tag.split(':', 1)) for tag in opts.additional_tags.split(',') ) for master in master_nodes: master.add_tags( dict(additional_tags, Name='{cn}-master-{iid}'.format(cn=cluster_name, iid=master.id)) ) for slave in slave_nodes: slave.add_tags( dict(additional_tags, Name='{cn}-slave-{iid}'.format(cn=cluster_name, iid=slave.id)) ) # Return all the instances return (master_nodes, slave_nodes) def get_existing_cluster(conn, opts, cluster_name, die_on_error=True): """ Get the EC2 instances in an existing cluster if available. Returns a tuple of lists of EC2 instance objects for the masters and slaves. """ print("Searching for existing cluster {c} in region {r}...".format( c=cluster_name, r=opts.region)) def get_instances(group_names): """ Get all non-terminated instances that belong to any of the provided security groups. EC2 reservation filters and instance states are documented here: http://docs.aws.amazon.com/cli/latest/reference/ec2/describe-instances.html#options """ reservations = conn.get_all_reservations( filters={"instance.group-name": group_names}) instances = itertools.chain.from_iterable(r.instances for r in reservations) return [i for i in instances if i.state not in ["shutting-down", "terminated"]] master_instances = get_instances([cluster_name + "-master"]) slave_instances = get_instances([cluster_name + "-slaves"]) if any((master_instances, slave_instances)): print("Found {m} master{plural_m}, {s} slave{plural_s}.".format( m=len(master_instances), plural_m=('' if len(master_instances) == 1 else 's'), s=len(slave_instances), plural_s=('' if len(slave_instances) == 1 else 's'))) if not master_instances and die_on_error: print("ERROR: Could not find a master for cluster {c} in region {r}.".format( c=cluster_name, r=opts.region), file=sys.stderr) sys.exit(1) return (master_instances, slave_instances) # Deploy configuration files and run setup scripts on a newly launched # or started EC2 cluster. def setup_cluster(conn, master_nodes, slave_nodes, opts, deploy_ssh_key): master = get_dns_name(master_nodes[0], opts.private_ips) if deploy_ssh_key: print("Generating cluster's SSH key on master...") key_setup = """ [ -f ~/.ssh/id_rsa ] || (ssh-keygen -q -t rsa -N '' -f ~/.ssh/id_rsa && cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys) """ ssh(master, opts, key_setup) dot_ssh_tar = ssh_read(master, opts, ['tar', 'c', '.ssh']) print("Transferring cluster's SSH key to slaves...") for slave in slave_nodes: slave_address = get_dns_name(slave, opts.private_ips) print(slave_address) ssh_write(slave_address, opts, ['tar', 'x'], dot_ssh_tar) modules = ['spark', 'ephemeral-hdfs', 'persistent-hdfs', 'mapreduce', 'spark-standalone', 'tachyon', 'rstudio'] if opts.hadoop_major_version == "1": modules = list(filter(lambda x: x != "mapreduce", modules)) if opts.ganglia: modules.append('ganglia') # Clear SPARK_WORKER_INSTANCES if running on YARN if opts.hadoop_major_version == "yarn": opts.worker_instances = "" # NOTE: We should clone the repository before running deploy_files to # prevent ec2-variables.sh from being overwritten print("Cloning spark-ec2 scripts from {r}/tree/{b} on master...".format( r=opts.spark_ec2_git_repo, b=opts.spark_ec2_git_branch)) ssh( host=master, opts=opts, command="rm -rf spark-ec2" + " && " + "git clone {r} -b {b} spark-ec2".format(r=opts.spark_ec2_git_repo, b=opts.spark_ec2_git_branch) ) print("Deploying files to master...") deploy_files( conn=conn, root_dir=SPARK_EC2_DIR + "/" + "deploy.generic", opts=opts, master_nodes=master_nodes, slave_nodes=slave_nodes, modules=modules ) if opts.deploy_root_dir is not None: print("Deploying {s} to master...".format(s=opts.deploy_root_dir)) deploy_user_files( root_dir=opts.deploy_root_dir, opts=opts, master_nodes=master_nodes ) print("Running setup on master...") setup_spark_cluster(master, opts) print("Done!") def setup_spark_cluster(master, opts): ssh(master, opts, "chmod u+x spark-ec2/setup.sh") ssh(master, opts, "spark-ec2/setup.sh") print("Spark standalone cluster started at http://%s:8080" % master) if opts.ganglia: print("Ganglia started at http://%s:5080/ganglia" % master) def is_ssh_available(host, opts, print_ssh_output=True): """ Check if SSH is available on a host. """ s = subprocess.Popen( ssh_command(opts) + ['-t', '-t', '-o', 'ConnectTimeout=3', '%s@%s' % (opts.user, host), stringify_command('true')], stdout=subprocess.PIPE, stderr=subprocess.STDOUT # we pipe stderr through stdout to preserve output order ) cmd_output = s.communicate()[0] # [1] is stderr, which we redirected to stdout if s.returncode != 0 and print_ssh_output: # extra leading newline is for spacing in wait_for_cluster_state() print(textwrap.dedent("""\n Warning: SSH connection error. (This could be temporary.) Host: {h} SSH return code: {r} SSH output: {o} """).format( h=host, r=s.returncode, o=cmd_output.strip() )) return s.returncode == 0 def is_cluster_ssh_available(cluster_instances, opts): """ Check if SSH is available on all the instances in a cluster. """ for i in cluster_instances: dns_name = get_dns_name(i, opts.private_ips) if not is_ssh_available(host=dns_name, opts=opts): return False else: return True def wait_for_cluster_state(conn, opts, cluster_instances, cluster_state): """ Wait for all the instances in the cluster to reach a designated state. cluster_instances: a list of boto.ec2.instance.Instance cluster_state: a string representing the desired state of all the instances in the cluster value can be 'ssh-ready' or a valid value from boto.ec2.instance.InstanceState such as 'running', 'terminated', etc. (would be nice to replace this with a proper enum: http://stackoverflow.com/a/1695250) """ sys.stdout.write( "Waiting for cluster to enter '{s}' state.".format(s=cluster_state) ) sys.stdout.flush() start_time = datetime.now() num_attempts = 0 while True: time.sleep(5 * num_attempts) # seconds for i in cluster_instances: i.update() max_batch = 100 statuses = [] for j in xrange(0, len(cluster_instances), max_batch): batch = [i.id for i in cluster_instances[j:j + max_batch]] statuses.extend(conn.get_all_instance_status(instance_ids=batch)) if cluster_state == 'ssh-ready': if all(i.state == 'running' for i in cluster_instances) and \ all(s.system_status.status == 'ok' for s in statuses) and \ all(s.instance_status.status == 'ok' for s in statuses) and \ is_cluster_ssh_available(cluster_instances, opts): break else: if all(i.state == cluster_state for i in cluster_instances): break num_attempts += 1 sys.stdout.write(".") sys.stdout.flush() sys.stdout.write("\n") end_time = datetime.now() print("Cluster is now in '{s}' state. Waited {t} seconds.".format( s=cluster_state, t=(end_time - start_time).seconds )) # Get number of local disks available for a given EC2 instance type. def get_num_disks(instance_type): # Source: http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/InstanceStorage.html # Last Updated: 2015-06-19 # For easy maintainability, please keep this manually-inputted dictionary sorted by key. disks_by_instance = { "c1.medium": 1, "c1.xlarge": 4, "c3.large": 2, "c3.xlarge": 2, "c3.2xlarge": 2, "c3.4xlarge": 2, "c3.8xlarge": 2, "c4.large": 0, "c4.xlarge": 0, "c4.2xlarge": 0, "c4.4xlarge": 0, "c4.8xlarge": 0, "cc1.4xlarge": 2, "cc2.8xlarge": 4, "cg1.4xlarge": 2, "cr1.8xlarge": 2, "d2.xlarge": 3, "d2.2xlarge": 6, "d2.4xlarge": 12, "d2.8xlarge": 24, "g2.2xlarge": 1, "g2.8xlarge": 2, "hi1.4xlarge": 2, "hs1.8xlarge": 24, "i2.xlarge": 1, "i2.2xlarge": 2, "i2.4xlarge": 4, "i2.8xlarge": 8, "m1.small": 1, "m1.medium": 1, "m1.large": 2, "m1.xlarge": 4, "m2.xlarge": 1, "m2.2xlarge": 1, "m2.4xlarge": 2, "m3.medium": 1, "m3.large": 1, "m3.xlarge": 2, "m3.2xlarge": 2, "m4.large": 0, "m4.xlarge": 0, "m4.2xlarge": 0, "m4.4xlarge": 0, "m4.10xlarge": 0, "r3.large": 1, "r3.xlarge": 1, "r3.2xlarge": 1, "r3.4xlarge": 1, "r3.8xlarge": 2, "t1.micro": 0, "t2.micro": 0, "t2.small": 0, "t2.medium": 0, "t2.large": 0, } if instance_type in disks_by_instance: return disks_by_instance[instance_type] else: print("WARNING: Don't know number of disks on instance type %s; assuming 1" % instance_type, file=stderr) return 1 # Deploy the configuration file templates in a given local directory to # a cluster, filling in any template parameters with information about the # cluster (e.g. lists of masters and slaves). Files are only deployed to # the first master instance in the cluster, and we expect the setup # script to be run on that instance to copy them to other nodes. # # root_dir should be an absolute path to the directory with the files we want to deploy. def deploy_files(conn, root_dir, opts, master_nodes, slave_nodes, modules): active_master = get_dns_name(master_nodes[0], opts.private_ips) num_disks = get_num_disks(opts.instance_type) hdfs_data_dirs = "/mnt/ephemeral-hdfs/data" mapred_local_dirs = "/mnt/hadoop/mrlocal" spark_local_dirs = "/mnt/spark" if num_disks > 1: for i in range(2, num_disks + 1): hdfs_data_dirs += ",/mnt%d/ephemeral-hdfs/data" % i mapred_local_dirs += ",/mnt%d/hadoop/mrlocal" % i spark_local_dirs += ",/mnt%d/spark" % i cluster_url = "%s:7077" % active_master if "." in opts.spark_version: # Pre-built Spark deploy spark_v = get_validate_spark_version(opts.spark_version, opts.spark_git_repo) tachyon_v = get_tachyon_version(spark_v) else: # Spark-only custom deploy spark_v = "%s|%s" % (opts.spark_git_repo, opts.spark_version) tachyon_v = "" print("Deploying Spark via git hash; Tachyon won't be set up") modules = filter(lambda x: x != "tachyon", modules) master_addresses = [get_dns_name(i, opts.private_ips) for i in master_nodes] slave_addresses = [get_dns_name(i, opts.private_ips) for i in slave_nodes] worker_instances_str = "%d" % opts.worker_instances if opts.worker_instances else "" template_vars = { "master_list": '\n'.join(master_addresses), "active_master": active_master, "slave_list": '\n'.join(slave_addresses), "cluster_url": cluster_url, "hdfs_data_dirs": hdfs_data_dirs, "mapred_local_dirs": mapred_local_dirs, "spark_local_dirs": spark_local_dirs, "swap": str(opts.swap), "modules": '\n'.join(modules), "spark_version": spark_v, "tachyon_version": tachyon_v, "hadoop_major_version": opts.hadoop_major_version, "spark_worker_instances": worker_instances_str, "spark_master_opts": opts.master_opts } if opts.copy_aws_credentials: template_vars["aws_access_key_id"] = conn.aws_access_key_id template_vars["aws_secret_access_key"] = conn.aws_secret_access_key else: template_vars["aws_access_key_id"] = "" template_vars["aws_secret_access_key"] = "" # Create a temp directory in which we will place all the files to be # deployed after we substitue template parameters in them tmp_dir = tempfile.mkdtemp() for path, dirs, files in os.walk(root_dir): if path.find(".svn") == -1: dest_dir = os.path.join('/', path[len(root_dir):]) local_dir = tmp_dir + dest_dir if not os.path.exists(local_dir): os.makedirs(local_dir) for filename in files: if filename[0] not in '#.~' and filename[-1] != '~': dest_file = os.path.join(dest_dir, filename) local_file = tmp_dir + dest_file with open(os.path.join(path, filename)) as src: with open(local_file, "w") as dest: text = src.read() for key in template_vars: text = text.replace("{{" + key + "}}", template_vars[key]) dest.write(text) dest.close() # rsync the whole directory over to the master machine command = [ 'rsync', '-rv', '-e', stringify_command(ssh_command(opts)), "%s/" % tmp_dir, "%s@%s:/" % (opts.user, active_master) ] subprocess.check_call(command) # Remove the temp directory we created above shutil.rmtree(tmp_dir) # Deploy a given local directory to a cluster, WITHOUT parameter substitution. # Note that unlike deploy_files, this works for binary files. # Also, it is up to the user to add (or not) the trailing slash in root_dir. # Files are only deployed to the first master instance in the cluster. # # root_dir should be an absolute path. def deploy_user_files(root_dir, opts, master_nodes): active_master = get_dns_name(master_nodes[0], opts.private_ips) command = [ 'rsync', '-rv', '-e', stringify_command(ssh_command(opts)), "%s" % root_dir, "%s@%s:/" % (opts.user, active_master) ] subprocess.check_call(command) def stringify_command(parts): if isinstance(parts, str): return parts else: return ' '.join(map(pipes.quote, parts)) def ssh_args(opts): parts = ['-o', 'StrictHostKeyChecking=no'] parts += ['-o', 'UserKnownHostsFile=/dev/null'] if opts.identity_file is not None: parts += ['-i', opts.identity_file] return parts def ssh_command(opts): return ['ssh'] + ssh_args(opts) # Run a command on a host through ssh, retrying up to five times # and then throwing an exception if ssh continues to fail. def ssh(host, opts, command): tries = 0 while True: try: return subprocess.check_call( ssh_command(opts) + ['-t', '-t', '%s@%s' % (opts.user, host), stringify_command(command)]) except subprocess.CalledProcessError as e: if tries > 5: # If this was an ssh failure, provide the user with hints. if e.returncode == 255: raise UsageError( "Failed to SSH to remote host {0}.\n" "Please check that you have provided the correct --identity-file and " "--key-pair parameters and try again.".format(host)) else: raise e print("Error executing remote command, retrying after 30 seconds: {0}".format(e), file=stderr) time.sleep(30) tries = tries + 1 # Backported from Python 2.7 for compatiblity with 2.6 (See SPARK-1990) def _check_output(*popenargs, **kwargs): if 'stdout' in kwargs: raise ValueError('stdout argument not allowed, it will be overridden.') process = subprocess.Popen(stdout=subprocess.PIPE, *popenargs, **kwargs) output, unused_err = process.communicate() retcode = process.poll() if retcode: cmd = kwargs.get("args") if cmd is None: cmd = popenargs[0] raise subprocess.CalledProcessError(retcode, cmd, output=output) return output def ssh_read(host, opts, command): return _check_output( ssh_command(opts) + ['%s@%s' % (opts.user, host), stringify_command(command)]) def ssh_write(host, opts, command, arguments): tries = 0 while True: proc = subprocess.Popen( ssh_command(opts) + ['%s@%s' % (opts.user, host), stringify_command(command)], stdin=subprocess.PIPE) proc.stdin.write(arguments) proc.stdin.close() status = proc.wait() if status == 0: break elif tries > 5: raise RuntimeError("ssh_write failed with error %s" % proc.returncode) else: print("Error {0} while executing remote command, retrying after 30 seconds". format(status), file=stderr) time.sleep(30) tries = tries + 1 # Gets a list of zones to launch instances in def get_zones(conn, opts): if opts.zone == 'all': zones = [z.name for z in conn.get_all_zones()] else: zones = [opts.zone] return zones # Gets the number of items in a partition def get_partition(total, num_partitions, current_partitions): num_slaves_this_zone = total // num_partitions if (total % num_partitions) - current_partitions > 0: num_slaves_this_zone += 1 return num_slaves_this_zone # Gets the IP address, taking into account the --private-ips flag def get_ip_address(instance, private_ips=False): ip = instance.ip_address if not private_ips else \ instance.private_ip_address return ip # Gets the DNS name, taking into account the --private-ips flag def get_dns_name(instance, private_ips=False): dns = instance.public_dns_name if not private_ips else \ instance.private_ip_address return dns def real_main(): (opts, action, cluster_name) = parse_args() # Input parameter validation get_validate_spark_version(opts.spark_version, opts.spark_git_repo) if opts.wait is not None: # NOTE: DeprecationWarnings are silent in 2.7+ by default. # To show them, run Python with the -Wdefault switch. # See: https://docs.python.org/3.5/whatsnew/2.7.html warnings.warn( "This option is deprecated and has no effect. " "spark-ec2 automatically waits as long as necessary for clusters to start up.", DeprecationWarning ) if opts.identity_file is not None: if not os.path.exists(opts.identity_file): print("ERROR: The identity file '{f}' doesn't exist.".format(f=opts.identity_file), file=stderr) sys.exit(1) file_mode = os.stat(opts.identity_file).st_mode if not (file_mode & S_IRUSR) or not oct(file_mode)[-2:] == '00': print("ERROR: The identity file must be accessible only by you.", file=stderr) print('You can fix this with: chmod 400 "{f}"'.format(f=opts.identity_file), file=stderr) sys.exit(1) if opts.instance_type not in EC2_INSTANCE_TYPES: print("Warning: Unrecognized EC2 instance type for instance-type: {t}".format( t=opts.instance_type), file=stderr) if opts.master_instance_type != "": if opts.master_instance_type not in EC2_INSTANCE_TYPES: print("Warning: Unrecognized EC2 instance type for master-instance-type: {t}".format( t=opts.master_instance_type), file=stderr) # Since we try instance types even if we can't resolve them, we check if they resolve first # and, if they do, see if they resolve to the same virtualization type. if opts.instance_type in EC2_INSTANCE_TYPES and \ opts.master_instance_type in EC2_INSTANCE_TYPES: if EC2_INSTANCE_TYPES[opts.instance_type] != \ EC2_INSTANCE_TYPES[opts.master_instance_type]: print("Error: spark-ec2 currently does not support having a master and slaves " "with different AMI virtualization types.", file=stderr) print("master instance virtualization type: {t}".format( t=EC2_INSTANCE_TYPES[opts.master_instance_type]), file=stderr) print("slave instance virtualization type: {t}".format( t=EC2_INSTANCE_TYPES[opts.instance_type]), file=stderr) sys.exit(1) if opts.ebs_vol_num > 8: print("ebs-vol-num cannot be greater than 8", file=stderr) sys.exit(1) # Prevent breaking ami_prefix (/, .git and startswith checks) # Prevent forks with non spark-ec2 names for now. if opts.spark_ec2_git_repo.endswith("/") or \ opts.spark_ec2_git_repo.endswith(".git") or \ not opts.spark_ec2_git_repo.startswith("https://github.com") or \ not opts.spark_ec2_git_repo.endswith("spark-ec2"): print("spark-ec2-git-repo must be a github repo and it must not have a trailing / or .git. " "Furthermore, we currently only support forks named spark-ec2.", file=stderr) sys.exit(1) if not (opts.deploy_root_dir is None or (os.path.isabs(opts.deploy_root_dir) and os.path.isdir(opts.deploy_root_dir) and os.path.exists(opts.deploy_root_dir))): print("--deploy-root-dir must be an absolute path to a directory that exists " "on the local file system", file=stderr) sys.exit(1) try: conn = ec2.connect_to_region(opts.region) except Exception as e: print((e), file=stderr) sys.exit(1) # Select an AZ at random if it was not specified. if opts.zone == "": opts.zone = random.choice(conn.get_all_zones()).name if action == "launch": if opts.slaves <= 0: print("ERROR: You have to start at least 1 slave", file=sys.stderr) sys.exit(1) if opts.resume: (master_nodes, slave_nodes) = get_existing_cluster(conn, opts, cluster_name) else: (master_nodes, slave_nodes) = launch_cluster(conn, opts, cluster_name) wait_for_cluster_state( conn=conn, opts=opts, cluster_instances=(master_nodes + slave_nodes), cluster_state='ssh-ready' ) setup_cluster(conn, master_nodes, slave_nodes, opts, True) elif action == "destroy": (master_nodes, slave_nodes) = get_existing_cluster( conn, opts, cluster_name, die_on_error=False) if any(master_nodes + slave_nodes): print("The following instances will be terminated:") for inst in master_nodes + slave_nodes: print("> %s" % get_dns_name(inst, opts.private_ips)) print("ALL DATA ON ALL NODES WILL BE LOST!!") msg = "Are you sure you want to destroy the cluster {c}? (y/N) ".format(c=cluster_name) response = raw_input(msg) if response == "y": print("Terminating master...") for inst in master_nodes: inst.terminate() print("Terminating slaves...") for inst in slave_nodes: inst.terminate() # Delete security groups as well if opts.delete_groups: group_names = [cluster_name + "-master", cluster_name + "-slaves"] wait_for_cluster_state( conn=conn, opts=opts, cluster_instances=(master_nodes + slave_nodes), cluster_state='terminated' ) print("Deleting security groups (this will take some time)...") attempt = 1 while attempt <= 3: print("Attempt %d" % attempt) groups = [g for g in conn.get_all_security_groups() if g.name in group_names] success = True # Delete individual rules in all groups before deleting groups to # remove dependencies between them for group in groups: print("Deleting rules in security group " + group.name) for rule in group.rules: for grant in rule.grants: success &= group.revoke(ip_protocol=rule.ip_protocol, from_port=rule.from_port, to_port=rule.to_port, src_group=grant) # Sleep for AWS eventual-consistency to catch up, and for instances # to terminate time.sleep(30) # Yes, it does have to be this long :-( for group in groups: try: # It is needed to use group_id to make it work with VPC conn.delete_security_group(group_id=group.id) print("Deleted security group %s" % group.name) except boto.exception.EC2ResponseError: success = False print("Failed to delete security group %s" % group.name) # Unfortunately, group.revoke() returns True even if a rule was not # deleted, so this needs to be rerun if something fails if success: break attempt += 1 if not success: print("Failed to delete all security groups after 3 tries.") print("Try re-running in a few minutes.") elif action == "login": (master_nodes, slave_nodes) = get_existing_cluster(conn, opts, cluster_name) if not master_nodes[0].public_dns_name and not opts.private_ips: print("Master has no public DNS name. Maybe you meant to specify --private-ips?") else: master = get_dns_name(master_nodes[0], opts.private_ips) print("Logging into master " + master + "...") proxy_opt = [] if opts.proxy_port is not None: proxy_opt = ['-D', opts.proxy_port] subprocess.check_call( ssh_command(opts) + proxy_opt + ['-t', '-t', "%s@%s" % (opts.user, master)]) elif action == "reboot-slaves": response = raw_input( "Are you sure you want to reboot the cluster " + cluster_name + " slaves?\n" + "Reboot cluster slaves " + cluster_name + " (y/N): ") if response == "y": (master_nodes, slave_nodes) = get_existing_cluster( conn, opts, cluster_name, die_on_error=False) print("Rebooting slaves...") for inst in slave_nodes: if inst.state not in ["shutting-down", "terminated"]: print("Rebooting " + inst.id) inst.reboot() elif action == "get-master": (master_nodes, slave_nodes) = get_existing_cluster(conn, opts, cluster_name) if not master_nodes[0].public_dns_name and not opts.private_ips: print("Master has no public DNS name. Maybe you meant to specify --private-ips?") else: print(get_dns_name(master_nodes[0], opts.private_ips)) elif action == "stop": response = raw_input( "Are you sure you want to stop the cluster " + cluster_name + "?\nDATA ON EPHEMERAL DISKS WILL BE LOST, " + "BUT THE CLUSTER WILL KEEP USING SPACE ON\n" + "AMAZON EBS IF IT IS EBS-BACKED!!\n" + "All data on spot-instance slaves will be lost.\n" + "Stop cluster " + cluster_name + " (y/N): ") if response == "y": (master_nodes, slave_nodes) = get_existing_cluster( conn, opts, cluster_name, die_on_error=False) print("Stopping master...") for inst in master_nodes: if inst.state not in ["shutting-down", "terminated"]: inst.stop() print("Stopping slaves...") for inst in slave_nodes: if inst.state not in ["shutting-down", "terminated"]: if inst.spot_instance_request_id: inst.terminate() else: inst.stop() elif action == "start": (master_nodes, slave_nodes) = get_existing_cluster(conn, opts, cluster_name) print("Starting slaves...") for inst in slave_nodes: if inst.state not in ["shutting-down", "terminated"]: inst.start() print("Starting master...") for inst in master_nodes: if inst.state not in ["shutting-down", "terminated"]: inst.start() wait_for_cluster_state( conn=conn, opts=opts, cluster_instances=(master_nodes + slave_nodes), cluster_state='ssh-ready' ) # Determine types of running instances existing_master_type = master_nodes[0].instance_type existing_slave_type = slave_nodes[0].instance_type # Setting opts.master_instance_type to the empty string indicates we # have the same instance type for the master and the slaves if existing_master_type == existing_slave_type: existing_master_type = "" opts.master_instance_type = existing_master_type opts.instance_type = existing_slave_type setup_cluster(conn, master_nodes, slave_nodes, opts, False) else: print("Invalid action: %s" % action, file=stderr) sys.exit(1) def main(): try: real_main() except UsageError as e: print("\nError:\n", e, file=stderr) sys.exit(1) if __name__ == "__main__": logging.basicConfig() main()