aboutsummaryrefslogtreecommitdiff
path: root/ec2/spark_ec2.py
diff options
context:
space:
mode:
Diffstat (limited to 'ec2/spark_ec2.py')
-rwxr-xr-xec2/spark_ec2.py42
1 files changed, 21 insertions, 21 deletions
diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py
index 931e4068de..6a3647b218 100755
--- a/ec2/spark_ec2.py
+++ b/ec2/spark_ec2.py
@@ -35,8 +35,7 @@ from boto.ec2.blockdevicemapping import BlockDeviceMapping, EBSBlockDeviceType
# A static URL from which to figure out the latest Mesos EC2 AMI
-LATEST_AMI_URL = "https://s3.amazonaws.com/mesos-images/ids/latest-spark-0.5"
-LATEST_STANDALONE_AMI_URL = "https://s3.amazonaws.com/spark-standalone-amis/latest-spark"
+LATEST_AMI_URL = "https://s3.amazonaws.com/mesos-images/ids/latest-spark-0.6"
# Configure and parse our command-line arguments
@@ -59,12 +58,13 @@ def parse_args():
"WARNING: must be 64-bit; small instances won't work")
parser.add_option("-m", "--master-instance-type", default="",
help="Master instance type (leave empty for same as instance-type)")
- parser.add_option("-z", "--zone", default="us-east-1b",
+ parser.add_option("-r", "--region", default="us-east-1",
+ help="EC2 region zone to launch instances in")
+ parser.add_option("-z", "--zone", default="",
help="Availability zone to launch instances in")
parser.add_option("-a", "--ami", default="latest",
help="Amazon Machine Image ID to use, or 'latest' to use latest " +
- "availabe mesos AMI, 'standalone' for the latest available " +
- "standalone AMI (default: latest)")
+ "available AMI (default: latest)")
parser.add_option("-D", metavar="[ADDRESS:]PORT", dest="proxy_port",
help="Use SSH dynamic port forwarding to create a SOCKS proxy at " +
"the given local address (for use with login)")
@@ -191,19 +191,14 @@ def launch_cluster(conn, opts, cluster_name):
"group %s, %s or %s" % (master_group.name, slave_group.name, zoo_group.name))
sys.exit(1)
- if opts.ami in ["latest", "standalone"]:
-
- # Figure out the latest AMI from our static URL
- if opts.ami == "latest":
- url = LATEST_AMI_URL
- elif opts.ami == "standalone":
- url = LATEST_STANDALONE_AMI_URL
-
+ # Figure out the latest AMI from our static URL
+ if opts.ami == "latest":
try:
- opts.ami = urllib2.urlopen(url).read().strip()
+ opts.ami = urllib2.urlopen(LATEST_AMI_URL).read().strip()
print "Latest Spark AMI: " + opts.ami
except:
- print >> stderr, "Could not read " + url
+ print >> stderr, "Could not read " + LATEST_AMI_URL
+ sys.exit(1)
print "Launching instances..."
@@ -294,7 +289,7 @@ def launch_cluster(conn, opts, cluster_name):
# Get the EC2 instances in an existing cluster if available.
# Returns a tuple of lists of EC2 instance objects for the masters,
# slaves and zookeeper nodes (in that order).
-def get_existing_cluster(conn, opts, cluster_name):
+def get_existing_cluster(conn, opts, cluster_name, die_on_error=True):
print "Searching for existing cluster " + cluster_name + "..."
reservations = conn.get_all_instances()
master_nodes = []
@@ -310,9 +305,10 @@ def get_existing_cluster(conn, opts, cluster_name):
slave_nodes += res.instances
elif group_names == [cluster_name + "-zoo"]:
zoo_nodes += res.instances
- if master_nodes != [] and slave_nodes != []:
+ if any((master_nodes, slave_nodes, zoo_nodes)):
print ("Found %d master(s), %d slaves, %d ZooKeeper nodes" %
(len(master_nodes), len(slave_nodes), len(zoo_nodes)))
+ if (master_nodes != [] and slave_nodes != []) or not die_on_error:
return (master_nodes, slave_nodes, zoo_nodes)
else:
if master_nodes == [] and slave_nodes != []:
@@ -369,6 +365,7 @@ def get_num_disks(instance_type):
# From http://docs.amazonwebservices.com/AWSEC2/latest/UserGuide/index.html?InstanceStorage.html
disks_by_instance = {
"m1.small": 1,
+ "m1.medium": 1,
"m1.large": 2,
"m1.xlarge": 4,
"t1.micro": 1,
@@ -400,10 +397,12 @@ def deploy_files(conn, root_dir, opts, master_nodes, slave_nodes, zoo_nodes):
num_disks = get_num_disks(opts.instance_type)
hdfs_data_dirs = "/mnt/ephemeral-hdfs/data"
mapred_local_dirs = "/mnt/hadoop/mrlocal"
+ spark_local_dirs = "/mnt/spark"
if num_disks > 1:
for i in range(2, num_disks + 1):
hdfs_data_dirs += ",/mnt%d/ephemeral-hdfs/data" % i
mapred_local_dirs += ",/mnt%d/hadoop/mrlocal" % i
+ spark_local_dirs += ",/mnt%d/spark" % i
if zoo_nodes != []:
zoo_list = '\n'.join([i.public_dns_name for i in zoo_nodes])
@@ -423,7 +422,8 @@ def deploy_files(conn, root_dir, opts, master_nodes, slave_nodes, zoo_nodes):
"zoo_list": zoo_list,
"cluster_url": cluster_url,
"hdfs_data_dirs": hdfs_data_dirs,
- "mapred_local_dirs": mapred_local_dirs
+ "mapred_local_dirs": mapred_local_dirs,
+ "spark_local_dirs": spark_local_dirs
}
# Create a temp directory in which we will place all the files to be
@@ -470,7 +470,7 @@ def ssh(host, opts, command):
def main():
(opts, action, cluster_name) = parse_args()
- conn = boto.connect_ec2()
+ conn = boto.ec2.connect_to_region(opts.region)
# Select an AZ at random if it was not specified.
if opts.zone == "":
@@ -492,7 +492,7 @@ def main():
"Destroy cluster " + cluster_name + " (y/N): ")
if response == "y":
(master_nodes, slave_nodes, zoo_nodes) = get_existing_cluster(
- conn, opts, cluster_name)
+ conn, opts, cluster_name, die_on_error=False)
print "Terminating master..."
for inst in master_nodes:
inst.terminate()
@@ -527,7 +527,7 @@ def main():
"Stop cluster " + cluster_name + " (y/N): ")
if response == "y":
(master_nodes, slave_nodes, zoo_nodes) = get_existing_cluster(
- conn, opts, cluster_name)
+ conn, opts, cluster_name, die_on_error=False)
print "Stopping master..."
for inst in master_nodes:
if inst.state not in ["shutting-down", "terminated"]: