diff options
Diffstat (limited to 'ec2/spark_ec2.py')
-rwxr-xr-x | ec2/spark_ec2.py | 81 |
1 files changed, 62 insertions, 19 deletions
diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py index a5384d3bda..66b1faf2cd 100755 --- a/ec2/spark_ec2.py +++ b/ec2/spark_ec2.py @@ -82,12 +82,21 @@ def parse_args(): parser.add_option("--spot-price", metavar="PRICE", type="float", help="If specified, launch slaves as spot instances with the given " + "maximum price (in dollars)") - parser.add_option("-c", "--cluster-type", default="mesos", - help="'mesos' for a mesos cluster, 'standalone' for a standalone spark cluster (default: mesos)") + parser.add_option("--cluster-type", type="choice", metavar="TYPE", + choices=["mesos", "standalone"], default="mesos", + help="'mesos' for a Mesos cluster, 'standalone' for a standalone " + + "Spark cluster (default: mesos)") + parser.add_option("--ganglia", action="store_true", default=True, + help="Setup Ganglia monitoring on cluster (default: on). NOTE: " + + "the Ganglia page will be publicly accessible") + parser.add_option("--no-ganglia", action="store_false", dest="ganglia", + help="Disable Ganglia monitoring for the cluster") + parser.add_option("--new-scripts", action="store_true", default=False, + help="Use new spark-ec2 scripts, for Spark >= 0.7 AMIs") parser.add_option("-u", "--user", default="root", - help="The ssh user you want to connect as (default: root)") + help="The SSH user you want to connect as (default: root)") parser.add_option("--delete-groups", action="store_true", default=False, - help="When destroying a cluster, also destroy the security groups that were created") + help="When destroying a cluster, delete the security groups that were created") (opts, args) = parser.parse_args() if len(args) != 2: @@ -164,22 +173,23 @@ def launch_cluster(conn, opts, cluster_name): master_group.authorize(src_group=zoo_group) master_group.authorize('tcp', 22, 22, '0.0.0.0/0') master_group.authorize('tcp', 8080, 8081, '0.0.0.0/0') + master_group.authorize('tcp', 50030, 50030, '0.0.0.0/0') + master_group.authorize('tcp', 50070, 50070, '0.0.0.0/0') + master_group.authorize('tcp', 60070, 60070, '0.0.0.0/0') if opts.cluster_type == "mesos": - master_group.authorize('tcp', 50030, 50030, '0.0.0.0/0') - master_group.authorize('tcp', 50070, 50070, '0.0.0.0/0') - master_group.authorize('tcp', 60070, 60070, '0.0.0.0/0') master_group.authorize('tcp', 38090, 38090, '0.0.0.0/0') + if opts.ganglia: + master_group.authorize('tcp', 5080, 5080, '0.0.0.0/0') if slave_group.rules == []: # Group was just now created slave_group.authorize(src_group=master_group) slave_group.authorize(src_group=slave_group) slave_group.authorize(src_group=zoo_group) slave_group.authorize('tcp', 22, 22, '0.0.0.0/0') slave_group.authorize('tcp', 8080, 8081, '0.0.0.0/0') - if opts.cluster_type == "mesos": - slave_group.authorize('tcp', 50060, 50060, '0.0.0.0/0') - slave_group.authorize('tcp', 50075, 50075, '0.0.0.0/0') - slave_group.authorize('tcp', 60060, 60060, '0.0.0.0/0') - slave_group.authorize('tcp', 60075, 60075, '0.0.0.0/0') + slave_group.authorize('tcp', 50060, 50060, '0.0.0.0/0') + slave_group.authorize('tcp', 50075, 50075, '0.0.0.0/0') + slave_group.authorize('tcp', 60060, 60060, '0.0.0.0/0') + slave_group.authorize('tcp', 60075, 60075, '0.0.0.0/0') if zoo_group.rules == []: # Group was just now created zoo_group.authorize(src_group=master_group) zoo_group.authorize(src_group=slave_group) @@ -358,19 +368,38 @@ def get_existing_cluster(conn, opts, cluster_name, die_on_error=True): # Deploy configuration files and run setup scripts on a newly launched # or started EC2 cluster. def setup_cluster(conn, master_nodes, slave_nodes, zoo_nodes, opts, deploy_ssh_key): - print "Deploying files to master..." - deploy_files(conn, "deploy.generic", opts, master_nodes, slave_nodes, zoo_nodes) master = master_nodes[0].public_dns_name if deploy_ssh_key: print "Copying SSH key %s to master..." % opts.identity_file ssh(master, opts, 'mkdir -p ~/.ssh') scp(master, opts, opts.identity_file, '~/.ssh/id_rsa') ssh(master, opts, 'chmod 600 ~/.ssh/id_rsa') - print "Running setup on master..." + if opts.cluster_type == "mesos": - setup_mesos_cluster(master, opts) + modules = ['ephemeral-hdfs', 'persistent-hdfs', 'mesos'] elif opts.cluster_type == "standalone": - setup_standalone_cluster(master, slave_nodes, opts) + modules = ['ephemeral-hdfs', 'persistent-hdfs', 'spark-standalone'] + + if opts.ganglia: + modules.append('ganglia') + + if opts.new_scripts: + # NOTE: We should clone the repository before running deploy_files to + # prevent ec2-variables.sh from being overwritten + ssh(master, opts, "rm -rf spark-ec2 && git clone https://github.com/mesos/spark-ec2.git") + + print "Deploying files to master..." + deploy_files(conn, "deploy.generic", opts, master_nodes, slave_nodes, + zoo_nodes, modules) + + print "Running setup on master..." + if not opts.new_scripts: + if opts.cluster_type == "mesos": + setup_mesos_cluster(master, opts) + elif opts.cluster_type == "standalone": + setup_standalone_cluster(master, slave_nodes, opts) + else: + setup_spark_cluster(master, opts) print "Done!" def setup_mesos_cluster(master, opts): @@ -383,6 +412,17 @@ def setup_standalone_cluster(master, slave_nodes, opts): ssh(master, opts, "echo \"%s\" > spark/conf/slaves" % (slave_ips)) ssh(master, opts, "/root/spark/bin/start-all.sh") +def setup_spark_cluster(master, opts): + ssh(master, opts, "chmod u+x spark-ec2/setup.sh") + ssh(master, opts, "spark-ec2/setup.sh") + if opts.cluster_type == "mesos": + print "Mesos cluster started at http://%s:8080" % master + elif opts.cluster_type == "standalone": + print "Spark standalone cluster started at http://%s:8080" % master + + if opts.ganglia: + print "Ganglia started at http://%s:5080/ganglia" % master + # Wait for a whole cluster (masters, slaves and ZooKeeper) to start up def wait_for_cluster(conn, wait_secs, master_nodes, slave_nodes, zoo_nodes): @@ -427,7 +467,8 @@ def get_num_disks(instance_type): # cluster (e.g. lists of masters and slaves). Files are only deployed to # the first master instance in the cluster, and we expect the setup # script to be run on that instance to copy them to other nodes. -def deploy_files(conn, root_dir, opts, master_nodes, slave_nodes, zoo_nodes): +def deploy_files(conn, root_dir, opts, master_nodes, slave_nodes, zoo_nodes, + modules): active_master = master_nodes[0].public_dns_name num_disks = get_num_disks(opts.instance_type) @@ -459,7 +500,9 @@ def deploy_files(conn, root_dir, opts, master_nodes, slave_nodes, zoo_nodes): "cluster_url": cluster_url, "hdfs_data_dirs": hdfs_data_dirs, "mapred_local_dirs": mapred_local_dirs, - "spark_local_dirs": spark_local_dirs + "spark_local_dirs": spark_local_dirs, + "swap": str(opts.swap), + "modules": '\n'.join(modules) } # Create a temp directory in which we will place all the files to be |