diff options
Diffstat (limited to 'ec2/spark_ec2.py')
-rwxr-xr-x | ec2/spark_ec2.py | 80 |
1 files changed, 56 insertions, 24 deletions
diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py index 0b85bbd46f..971b0c6ad7 100755 --- a/ec2/spark_ec2.py +++ b/ec2/spark_ec2.py @@ -36,6 +36,7 @@ from boto.ec2.blockdevicemapping import BlockDeviceMapping, EBSBlockDeviceType # A static URL from which to figure out the latest Mesos EC2 AMI LATEST_AMI_URL = "https://s3.amazonaws.com/mesos-images/ids/latest-spark-0.5" +LATEST_STANDALONE_AMI_URL = "https://s3.amazonaws.com/spark-standalone-amis/latest-spark" # Configure and parse our command-line arguments @@ -62,7 +63,8 @@ def parse_args(): help="Availability zone to launch instances in") parser.add_option("-a", "--ami", default="latest", help="Amazon Machine Image ID to use, or 'latest' to use latest " + - "availabe AMI (default: latest)") + "availabe mesos AMI, 'standalone' for the latest available " + + "standalone AMI (default: latest)") parser.add_option("-D", metavar="[ADDRESS:]PORT", dest="proxy_port", help="Use SSH dynamic port forwarding to create a SOCKS proxy at " + "the given local address (for use with login)") @@ -78,6 +80,11 @@ def parse_args(): parser.add_option("--spot-price", metavar="PRICE", type="float", help="If specified, launch slaves as spot instances with the given " + "maximum price (in dollars)") + parser.add_option("-c", "--cluster-type", default="mesos", + help="'mesos' for a mesos cluster, 'standalone' for a standalone spark cluster (default: mesos)") + parser.add_option("-u", "--user", default="root", + help="The ssh user you want to connect as (default: root)") + (opts, args) = parser.parse_args() if len(args) != 2: parser.print_help() @@ -87,6 +94,9 @@ def parse_args(): print >> stderr, ("ERROR: The -i or --identity-file argument is " + "required for " + action) sys.exit(1) + if opts.cluster_type not in ["mesos", "standalone"] and action == "launch": + print >> stderr, ("ERROR: Invalid cluster type: " + opts.cluster_type) + sys.exit(1) if os.getenv('AWS_ACCESS_KEY_ID') == None: print >> stderr, ("ERROR: The environment variable AWS_ACCESS_KEY_ID " + "must be set") @@ -106,7 +116,7 @@ def get_or_make_group(conn, name): return group[0] else: print "Creating security group " + name - return conn.create_security_group(name, "Mesos EC2 group") + return conn.create_security_group(name, "Spark EC2 group") # Wait for a set of launched instances to exit the "pending" state @@ -144,20 +154,22 @@ def launch_cluster(conn, opts, cluster_name): master_group.authorize(src_group=zoo_group) master_group.authorize('tcp', 22, 22, '0.0.0.0/0') master_group.authorize('tcp', 8080, 8081, '0.0.0.0/0') - master_group.authorize('tcp', 50030, 50030, '0.0.0.0/0') - master_group.authorize('tcp', 50070, 50070, '0.0.0.0/0') - master_group.authorize('tcp', 60070, 60070, '0.0.0.0/0') - master_group.authorize('tcp', 38090, 38090, '0.0.0.0/0') + if opts.cluster_type == "mesos": + master_group.authorize('tcp', 50030, 50030, '0.0.0.0/0') + master_group.authorize('tcp', 50070, 50070, '0.0.0.0/0') + master_group.authorize('tcp', 60070, 60070, '0.0.0.0/0') + master_group.authorize('tcp', 38090, 38090, '0.0.0.0/0') if slave_group.rules == []: # Group was just now created slave_group.authorize(src_group=master_group) slave_group.authorize(src_group=slave_group) slave_group.authorize(src_group=zoo_group) slave_group.authorize('tcp', 22, 22, '0.0.0.0/0') slave_group.authorize('tcp', 8080, 8081, '0.0.0.0/0') - slave_group.authorize('tcp', 50060, 50060, '0.0.0.0/0') - slave_group.authorize('tcp', 50075, 50075, '0.0.0.0/0') - slave_group.authorize('tcp', 60060, 60060, '0.0.0.0/0') - slave_group.authorize('tcp', 60075, 60075, '0.0.0.0/0') + if opts.cluster_type == "mesos": + slave_group.authorize('tcp', 50060, 50060, '0.0.0.0/0') + slave_group.authorize('tcp', 50075, 50075, '0.0.0.0/0') + slave_group.authorize('tcp', 60060, 60060, '0.0.0.0/0') + slave_group.authorize('tcp', 60075, 60075, '0.0.0.0/0') if zoo_group.rules == []: # Group was just now created zoo_group.authorize(src_group=master_group) zoo_group.authorize(src_group=slave_group) @@ -179,13 +191,19 @@ def launch_cluster(conn, opts, cluster_name): "group %s, %s or %s" % (master_group.name, slave_group.name, zoo_group.name)) sys.exit(1) - if opts.ami == "latest": + if opts.ami in ["latest", "standalone"]: + # Figure out the latest AMI from our static URL + if opts.ami == "latest": + url = LATEST_AMI_URL + elif opts.ami == "standalone": + url = LATEST_STANDALONE_AMI_URL + try: - opts.ami = urllib2.urlopen(LATEST_AMI_URL).read().strip() + opts.ami = urllib2.urlopen(url).read().strip() print "Latest Spark AMI: " + opts.ami except: - print >> stderr, "Could not read " + LATEST_AMI_URL + print >> stderr, "Could not read " + url print "Launching instances..." @@ -314,14 +332,25 @@ def setup_cluster(conn, master_nodes, slave_nodes, zoo_nodes, opts, deploy_ssh_k master = master_nodes[0].public_dns_name if deploy_ssh_key: print "Copying SSH key %s to master..." % opts.identity_file - ssh(master, opts, 'mkdir -p /root/.ssh') - scp(master, opts, opts.identity_file, '/root/.ssh/id_rsa') + ssh(master, opts, 'mkdir -p ~/.ssh') + scp(master, opts, opts.identity_file, '~/.ssh/id_rsa') print "Running setup on master..." + if opts.cluster_type == "mesos": + setup_mesos_cluster(master, opts) + elif opts.cluster_type == "standalone": + setup_standalone_cluster(master, slave_nodes, opts) + print "Done!" + +def setup_mesos_cluster(master, opts): ssh(master, opts, "chmod u+x mesos-ec2/setup") ssh(master, opts, "mesos-ec2/setup %s %s %s %s" % ("generic", "none", "master", opts.swap)) - print "Done!" +def setup_standalone_cluster(master, slave_nodes, opts): + slave_ips = '\n'.join([i.public_dns_name for i in slave_nodes]) + ssh(master, opts, "echo \"%s\" > spark/conf/slaves" % (slave_ips)) + ssh(master, opts, "/home/ec2-user/spark/bin/start-all.sh") + # Wait for a whole cluster (masters, slaves and ZooKeeper) to start up def wait_for_cluster(conn, wait_secs, master_nodes, slave_nodes, zoo_nodes): @@ -380,9 +409,12 @@ def deploy_files(conn, root_dir, opts, master_nodes, slave_nodes, zoo_nodes): zoo_list = '\n'.join([i.public_dns_name for i in zoo_nodes]) cluster_url = "zoo://" + ",".join( ["%s:2181/mesos" % i.public_dns_name for i in zoo_nodes]) - else: + elif opts.cluster_type == "mesos": zoo_list = "NONE" cluster_url = "%s:5050" % active_master + elif opts.cluster_type == "standalone": + zoo_list = "NONE" + cluster_url = "%s:7077" % active_master template_vars = { "master_list": '\n'.join([i.public_dns_name for i in master_nodes]), @@ -416,7 +448,7 @@ def deploy_files(conn, root_dir, opts, master_nodes, slave_nodes, zoo_nodes): dest.close() # rsync the whole directory over to the master machine command = (("rsync -rv -e 'ssh -o StrictHostKeyChecking=no -i %s' " + - "'%s/' 'root@%s:/'") % (opts.identity_file, tmp_dir, active_master)) + "'%s/' '%s@%s:~'") % (opts.identity_file, tmp_dir, opts.user, active_master)) subprocess.check_call(command, shell=True) # Remove the temp directory we created above shutil.rmtree(tmp_dir) @@ -425,15 +457,15 @@ def deploy_files(conn, root_dir, opts, master_nodes, slave_nodes, zoo_nodes): # Copy a file to a given host through scp, throwing an exception if scp fails def scp(host, opts, local_file, dest_file): subprocess.check_call( - "scp -q -o StrictHostKeyChecking=no -i %s '%s' 'root@%s:%s'" % - (opts.identity_file, local_file, host, dest_file), shell=True) + "scp -q -o StrictHostKeyChecking=no -i %s '%s' '%s@%s:%s'" % + (opts.identity_file, local_file, opts.user, host, dest_file), shell=True) # Run a command on a host through ssh, throwing an exception if ssh fails def ssh(host, opts, command): subprocess.check_call( - "ssh -t -o StrictHostKeyChecking=no -i %s root@%s '%s'" % - (opts.identity_file, host, command), shell=True) + "ssh -t -o StrictHostKeyChecking=no -i %s %s@%s '%s'" % + (opts.identity_file, opts.user, host, command), shell=True) def main(): @@ -480,8 +512,8 @@ def main(): proxy_opt = "" if opts.proxy_port != None: proxy_opt = "-D " + opts.proxy_port - subprocess.check_call("ssh -o StrictHostKeyChecking=no -i %s %s root@%s" % - (opts.identity_file, proxy_opt, master), shell=True) + subprocess.check_call("ssh -o StrictHostKeyChecking=no -i %s %s %s@%s" % + (opts.identity_file, proxy_opt, opts.user, master), shell=True) elif action == "get-master": (master_nodes, slave_nodes, zoo_nodes) = get_existing_cluster(conn, opts, cluster_name) |