diff options
Diffstat (limited to 'ec2/spark_ec2.py')
-rwxr-xr-x | ec2/spark_ec2.py | 42 |
1 files changed, 21 insertions, 21 deletions
diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py index 931e4068de..6a3647b218 100755 --- a/ec2/spark_ec2.py +++ b/ec2/spark_ec2.py @@ -35,8 +35,7 @@ from boto.ec2.blockdevicemapping import BlockDeviceMapping, EBSBlockDeviceType # A static URL from which to figure out the latest Mesos EC2 AMI -LATEST_AMI_URL = "https://s3.amazonaws.com/mesos-images/ids/latest-spark-0.5" -LATEST_STANDALONE_AMI_URL = "https://s3.amazonaws.com/spark-standalone-amis/latest-spark" +LATEST_AMI_URL = "https://s3.amazonaws.com/mesos-images/ids/latest-spark-0.6" # Configure and parse our command-line arguments @@ -59,12 +58,13 @@ def parse_args(): "WARNING: must be 64-bit; small instances won't work") parser.add_option("-m", "--master-instance-type", default="", help="Master instance type (leave empty for same as instance-type)") - parser.add_option("-z", "--zone", default="us-east-1b", + parser.add_option("-r", "--region", default="us-east-1", + help="EC2 region zone to launch instances in") + parser.add_option("-z", "--zone", default="", help="Availability zone to launch instances in") parser.add_option("-a", "--ami", default="latest", help="Amazon Machine Image ID to use, or 'latest' to use latest " + - "availabe mesos AMI, 'standalone' for the latest available " + - "standalone AMI (default: latest)") + "available AMI (default: latest)") parser.add_option("-D", metavar="[ADDRESS:]PORT", dest="proxy_port", help="Use SSH dynamic port forwarding to create a SOCKS proxy at " + "the given local address (for use with login)") @@ -191,19 +191,14 @@ def launch_cluster(conn, opts, cluster_name): "group %s, %s or %s" % (master_group.name, slave_group.name, zoo_group.name)) sys.exit(1) - if opts.ami in ["latest", "standalone"]: - - # Figure out the latest AMI from our static URL - if opts.ami == "latest": - url = LATEST_AMI_URL - elif opts.ami == "standalone": - url = LATEST_STANDALONE_AMI_URL - + # Figure out the latest AMI from our static URL + if opts.ami == "latest": try: - opts.ami = urllib2.urlopen(url).read().strip() + opts.ami = urllib2.urlopen(LATEST_AMI_URL).read().strip() print "Latest Spark AMI: " + opts.ami except: - print >> stderr, "Could not read " + url + print >> stderr, "Could not read " + LATEST_AMI_URL + sys.exit(1) print "Launching instances..." @@ -294,7 +289,7 @@ def launch_cluster(conn, opts, cluster_name): # Get the EC2 instances in an existing cluster if available. # Returns a tuple of lists of EC2 instance objects for the masters, # slaves and zookeeper nodes (in that order). -def get_existing_cluster(conn, opts, cluster_name): +def get_existing_cluster(conn, opts, cluster_name, die_on_error=True): print "Searching for existing cluster " + cluster_name + "..." reservations = conn.get_all_instances() master_nodes = [] @@ -310,9 +305,10 @@ def get_existing_cluster(conn, opts, cluster_name): slave_nodes += res.instances elif group_names == [cluster_name + "-zoo"]: zoo_nodes += res.instances - if master_nodes != [] and slave_nodes != []: + if any((master_nodes, slave_nodes, zoo_nodes)): print ("Found %d master(s), %d slaves, %d ZooKeeper nodes" % (len(master_nodes), len(slave_nodes), len(zoo_nodes))) + if (master_nodes != [] and slave_nodes != []) or not die_on_error: return (master_nodes, slave_nodes, zoo_nodes) else: if master_nodes == [] and slave_nodes != []: @@ -369,6 +365,7 @@ def get_num_disks(instance_type): # From http://docs.amazonwebservices.com/AWSEC2/latest/UserGuide/index.html?InstanceStorage.html disks_by_instance = { "m1.small": 1, + "m1.medium": 1, "m1.large": 2, "m1.xlarge": 4, "t1.micro": 1, @@ -400,10 +397,12 @@ def deploy_files(conn, root_dir, opts, master_nodes, slave_nodes, zoo_nodes): num_disks = get_num_disks(opts.instance_type) hdfs_data_dirs = "/mnt/ephemeral-hdfs/data" mapred_local_dirs = "/mnt/hadoop/mrlocal" + spark_local_dirs = "/mnt/spark" if num_disks > 1: for i in range(2, num_disks + 1): hdfs_data_dirs += ",/mnt%d/ephemeral-hdfs/data" % i mapred_local_dirs += ",/mnt%d/hadoop/mrlocal" % i + spark_local_dirs += ",/mnt%d/spark" % i if zoo_nodes != []: zoo_list = '\n'.join([i.public_dns_name for i in zoo_nodes]) @@ -423,7 +422,8 @@ def deploy_files(conn, root_dir, opts, master_nodes, slave_nodes, zoo_nodes): "zoo_list": zoo_list, "cluster_url": cluster_url, "hdfs_data_dirs": hdfs_data_dirs, - "mapred_local_dirs": mapred_local_dirs + "mapred_local_dirs": mapred_local_dirs, + "spark_local_dirs": spark_local_dirs } # Create a temp directory in which we will place all the files to be @@ -470,7 +470,7 @@ def ssh(host, opts, command): def main(): (opts, action, cluster_name) = parse_args() - conn = boto.connect_ec2() + conn = boto.ec2.connect_to_region(opts.region) # Select an AZ at random if it was not specified. if opts.zone == "": @@ -492,7 +492,7 @@ def main(): "Destroy cluster " + cluster_name + " (y/N): ") if response == "y": (master_nodes, slave_nodes, zoo_nodes) = get_existing_cluster( - conn, opts, cluster_name) + conn, opts, cluster_name, die_on_error=False) print "Terminating master..." for inst in master_nodes: inst.terminate() @@ -527,7 +527,7 @@ def main(): "Stop cluster " + cluster_name + " (y/N): ") if response == "y": (master_nodes, slave_nodes, zoo_nodes) = get_existing_cluster( - conn, opts, cluster_name) + conn, opts, cluster_name, die_on_error=False) print "Stopping master..." for inst in master_nodes: if inst.state not in ["shutting-down", "terminated"]: |