aboutsummaryrefslogtreecommitdiff
path: root/ec2/spark_ec2.py
diff options
context:
space:
mode:
authorDenny <dennybritz@gmail.com>2012-08-02 15:23:52 -0700
committerDenny <dennybritz@gmail.com>2012-08-02 15:23:52 -0700
commitaaed039e36b6b29b59e650ef075f66a6e4162449 (patch)
tree6d27762fb55a3aa045ef04951a2fef13bd145b78 /ec2/spark_ec2.py
parent0ee44c225e38abbf3382be6e9555ab9a35424a54 (diff)
downloadspark-aaed039e36b6b29b59e650ef075f66a6e4162449.tar.gz
spark-aaed039e36b6b29b59e650ef075f66a6e4162449.tar.bz2
spark-aaed039e36b6b29b59e650ef075f66a6e4162449.zip
Merged standalone and mesos EC2 scripts
Diffstat (limited to 'ec2/spark_ec2.py')
-rwxr-xr-xec2/spark_ec2.py80
1 files changed, 56 insertions, 24 deletions
diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py
index 0b85bbd46f..971b0c6ad7 100755
--- a/ec2/spark_ec2.py
+++ b/ec2/spark_ec2.py
@@ -36,6 +36,7 @@ from boto.ec2.blockdevicemapping import BlockDeviceMapping, EBSBlockDeviceType
# A static URL from which to figure out the latest Mesos EC2 AMI
LATEST_AMI_URL = "https://s3.amazonaws.com/mesos-images/ids/latest-spark-0.5"
+LATEST_STANDALONE_AMI_URL = "https://s3.amazonaws.com/spark-standalone-amis/latest-spark"
# Configure and parse our command-line arguments
@@ -62,7 +63,8 @@ def parse_args():
help="Availability zone to launch instances in")
parser.add_option("-a", "--ami", default="latest",
help="Amazon Machine Image ID to use, or 'latest' to use latest " +
- "availabe AMI (default: latest)")
+ "availabe mesos AMI, 'standalone' for the latest available " +
+ "standalone AMI (default: latest)")
parser.add_option("-D", metavar="[ADDRESS:]PORT", dest="proxy_port",
help="Use SSH dynamic port forwarding to create a SOCKS proxy at " +
"the given local address (for use with login)")
@@ -78,6 +80,11 @@ def parse_args():
parser.add_option("--spot-price", metavar="PRICE", type="float",
help="If specified, launch slaves as spot instances with the given " +
"maximum price (in dollars)")
+ parser.add_option("-c", "--cluster-type", default="mesos",
+ help="'mesos' for a mesos cluster, 'standalone' for a standalone spark cluster (default: mesos)")
+ parser.add_option("-u", "--user", default="root",
+ help="The ssh user you want to connect as (default: root)")
+
(opts, args) = parser.parse_args()
if len(args) != 2:
parser.print_help()
@@ -87,6 +94,9 @@ def parse_args():
print >> stderr, ("ERROR: The -i or --identity-file argument is " +
"required for " + action)
sys.exit(1)
+ if opts.cluster_type not in ["mesos", "standalone"] and action == "launch":
+ print >> stderr, ("ERROR: Invalid cluster type: " + opts.cluster_type)
+ sys.exit(1)
if os.getenv('AWS_ACCESS_KEY_ID') == None:
print >> stderr, ("ERROR: The environment variable AWS_ACCESS_KEY_ID " +
"must be set")
@@ -106,7 +116,7 @@ def get_or_make_group(conn, name):
return group[0]
else:
print "Creating security group " + name
- return conn.create_security_group(name, "Mesos EC2 group")
+ return conn.create_security_group(name, "Spark EC2 group")
# Wait for a set of launched instances to exit the "pending" state
@@ -144,20 +154,22 @@ def launch_cluster(conn, opts, cluster_name):
master_group.authorize(src_group=zoo_group)
master_group.authorize('tcp', 22, 22, '0.0.0.0/0')
master_group.authorize('tcp', 8080, 8081, '0.0.0.0/0')
- master_group.authorize('tcp', 50030, 50030, '0.0.0.0/0')
- master_group.authorize('tcp', 50070, 50070, '0.0.0.0/0')
- master_group.authorize('tcp', 60070, 60070, '0.0.0.0/0')
- master_group.authorize('tcp', 38090, 38090, '0.0.0.0/0')
+ if opts.cluster_type == "mesos":
+ master_group.authorize('tcp', 50030, 50030, '0.0.0.0/0')
+ master_group.authorize('tcp', 50070, 50070, '0.0.0.0/0')
+ master_group.authorize('tcp', 60070, 60070, '0.0.0.0/0')
+ master_group.authorize('tcp', 38090, 38090, '0.0.0.0/0')
if slave_group.rules == []: # Group was just now created
slave_group.authorize(src_group=master_group)
slave_group.authorize(src_group=slave_group)
slave_group.authorize(src_group=zoo_group)
slave_group.authorize('tcp', 22, 22, '0.0.0.0/0')
slave_group.authorize('tcp', 8080, 8081, '0.0.0.0/0')
- slave_group.authorize('tcp', 50060, 50060, '0.0.0.0/0')
- slave_group.authorize('tcp', 50075, 50075, '0.0.0.0/0')
- slave_group.authorize('tcp', 60060, 60060, '0.0.0.0/0')
- slave_group.authorize('tcp', 60075, 60075, '0.0.0.0/0')
+ if opts.cluster_type == "mesos":
+ slave_group.authorize('tcp', 50060, 50060, '0.0.0.0/0')
+ slave_group.authorize('tcp', 50075, 50075, '0.0.0.0/0')
+ slave_group.authorize('tcp', 60060, 60060, '0.0.0.0/0')
+ slave_group.authorize('tcp', 60075, 60075, '0.0.0.0/0')
if zoo_group.rules == []: # Group was just now created
zoo_group.authorize(src_group=master_group)
zoo_group.authorize(src_group=slave_group)
@@ -179,13 +191,19 @@ def launch_cluster(conn, opts, cluster_name):
"group %s, %s or %s" % (master_group.name, slave_group.name, zoo_group.name))
sys.exit(1)
- if opts.ami == "latest":
+ if opts.ami in ["latest", "standalone"]:
+
# Figure out the latest AMI from our static URL
+ if opts.ami == "latest":
+ url = LATEST_AMI_URL
+ elif opts.ami == "standalone":
+ url = LATEST_STANDALONE_AMI_URL
+
try:
- opts.ami = urllib2.urlopen(LATEST_AMI_URL).read().strip()
+ opts.ami = urllib2.urlopen(url).read().strip()
print "Latest Spark AMI: " + opts.ami
except:
- print >> stderr, "Could not read " + LATEST_AMI_URL
+ print >> stderr, "Could not read " + url
print "Launching instances..."
@@ -314,14 +332,25 @@ def setup_cluster(conn, master_nodes, slave_nodes, zoo_nodes, opts, deploy_ssh_k
master = master_nodes[0].public_dns_name
if deploy_ssh_key:
print "Copying SSH key %s to master..." % opts.identity_file
- ssh(master, opts, 'mkdir -p /root/.ssh')
- scp(master, opts, opts.identity_file, '/root/.ssh/id_rsa')
+ ssh(master, opts, 'mkdir -p ~/.ssh')
+ scp(master, opts, opts.identity_file, '~/.ssh/id_rsa')
print "Running setup on master..."
+ if opts.cluster_type == "mesos":
+ setup_mesos_cluster(master, opts)
+ elif opts.cluster_type == "standalone":
+ setup_standalone_cluster(master, slave_nodes, opts)
+ print "Done!"
+
+def setup_mesos_cluster(master, opts):
ssh(master, opts, "chmod u+x mesos-ec2/setup")
ssh(master, opts, "mesos-ec2/setup %s %s %s %s" %
("generic", "none", "master", opts.swap))
- print "Done!"
+def setup_standalone_cluster(master, slave_nodes, opts):
+ slave_ips = '\n'.join([i.public_dns_name for i in slave_nodes])
+ ssh(master, opts, "echo \"%s\" > spark/conf/slaves" % (slave_ips))
+ ssh(master, opts, "/home/ec2-user/spark/bin/start-all.sh")
+
# Wait for a whole cluster (masters, slaves and ZooKeeper) to start up
def wait_for_cluster(conn, wait_secs, master_nodes, slave_nodes, zoo_nodes):
@@ -380,9 +409,12 @@ def deploy_files(conn, root_dir, opts, master_nodes, slave_nodes, zoo_nodes):
zoo_list = '\n'.join([i.public_dns_name for i in zoo_nodes])
cluster_url = "zoo://" + ",".join(
["%s:2181/mesos" % i.public_dns_name for i in zoo_nodes])
- else:
+ elif opts.cluster_type == "mesos":
zoo_list = "NONE"
cluster_url = "%s:5050" % active_master
+ elif opts.cluster_type == "standalone":
+ zoo_list = "NONE"
+ cluster_url = "%s:7077" % active_master
template_vars = {
"master_list": '\n'.join([i.public_dns_name for i in master_nodes]),
@@ -416,7 +448,7 @@ def deploy_files(conn, root_dir, opts, master_nodes, slave_nodes, zoo_nodes):
dest.close()
# rsync the whole directory over to the master machine
command = (("rsync -rv -e 'ssh -o StrictHostKeyChecking=no -i %s' " +
- "'%s/' 'root@%s:/'") % (opts.identity_file, tmp_dir, active_master))
+ "'%s/' '%s@%s:~'") % (opts.identity_file, tmp_dir, opts.user, active_master))
subprocess.check_call(command, shell=True)
# Remove the temp directory we created above
shutil.rmtree(tmp_dir)
@@ -425,15 +457,15 @@ def deploy_files(conn, root_dir, opts, master_nodes, slave_nodes, zoo_nodes):
# Copy a file to a given host through scp, throwing an exception if scp fails
def scp(host, opts, local_file, dest_file):
subprocess.check_call(
- "scp -q -o StrictHostKeyChecking=no -i %s '%s' 'root@%s:%s'" %
- (opts.identity_file, local_file, host, dest_file), shell=True)
+ "scp -q -o StrictHostKeyChecking=no -i %s '%s' '%s@%s:%s'" %
+ (opts.identity_file, local_file, opts.user, host, dest_file), shell=True)
# Run a command on a host through ssh, throwing an exception if ssh fails
def ssh(host, opts, command):
subprocess.check_call(
- "ssh -t -o StrictHostKeyChecking=no -i %s root@%s '%s'" %
- (opts.identity_file, host, command), shell=True)
+ "ssh -t -o StrictHostKeyChecking=no -i %s %s@%s '%s'" %
+ (opts.identity_file, opts.user, host, command), shell=True)
def main():
@@ -480,8 +512,8 @@ def main():
proxy_opt = ""
if opts.proxy_port != None:
proxy_opt = "-D " + opts.proxy_port
- subprocess.check_call("ssh -o StrictHostKeyChecking=no -i %s %s root@%s" %
- (opts.identity_file, proxy_opt, master), shell=True)
+ subprocess.check_call("ssh -o StrictHostKeyChecking=no -i %s %s %s@%s" %
+ (opts.identity_file, proxy_opt, opts.user, master), shell=True)
elif action == "get-master":
(master_nodes, slave_nodes, zoo_nodes) = get_existing_cluster(conn, opts, cluster_name)