diff options
author | Matei Zaharia <matei@eecs.berkeley.edu> | 2013-02-17 13:48:43 -0800 |
---|---|---|
committer | Matei Zaharia <matei@eecs.berkeley.edu> | 2013-02-17 13:48:43 -0800 |
commit | 2a907dceb3c40f9eaea2501b49e2e203003fbd72 (patch) | |
tree | cc9022a786b336c36ec02c66da20757493886b2d /ec2/spark_ec2.py | |
parent | 340cc54e47993e4de1bca8e5f635ffb2d315b262 (diff) | |
parent | bf675ab4f90fdbea67e42a8df828ef15ed87a086 (diff) | |
download | spark-2a907dceb3c40f9eaea2501b49e2e203003fbd72.tar.gz spark-2a907dceb3c40f9eaea2501b49e2e203003fbd72.tar.bz2 spark-2a907dceb3c40f9eaea2501b49e2e203003fbd72.zip |
Merge pull request #421 from shivaram/spark-ec2-change
Switch spark_ec2.py to use the new spark-ec2 scripts.
Diffstat (limited to 'ec2/spark_ec2.py')
-rwxr-xr-x | ec2/spark_ec2.py | 63 |
1 files changed, 48 insertions, 15 deletions
diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py index a5384d3bda..ce1072fd39 100755 --- a/ec2/spark_ec2.py +++ b/ec2/spark_ec2.py @@ -84,6 +84,12 @@ def parse_args(): "maximum price (in dollars)") parser.add_option("-c", "--cluster-type", default="mesos", help="'mesos' for a mesos cluster, 'standalone' for a standalone spark cluster (default: mesos)") + parser.add_option("-g", "--ganglia", action="store_true", default=True, + help="Setup ganglia monitoring for the cluster. NOTE: The ganglia " + + "monitoring page will be publicly accessible") + parser.add_option("--mesos-scripts", action="store_true", default=False, + help="Use older mesos-ec2 scripts to setup the cluster. NOTE: Ganglia " + + "will not be setup with this option") parser.add_option("-u", "--user", default="root", help="The ssh user you want to connect as (default: root)") parser.add_option("--delete-groups", action="store_true", default=False, @@ -164,22 +170,23 @@ def launch_cluster(conn, opts, cluster_name): master_group.authorize(src_group=zoo_group) master_group.authorize('tcp', 22, 22, '0.0.0.0/0') master_group.authorize('tcp', 8080, 8081, '0.0.0.0/0') + master_group.authorize('tcp', 50030, 50030, '0.0.0.0/0') + master_group.authorize('tcp', 50070, 50070, '0.0.0.0/0') + master_group.authorize('tcp', 60070, 60070, '0.0.0.0/0') if opts.cluster_type == "mesos": - master_group.authorize('tcp', 50030, 50030, '0.0.0.0/0') - master_group.authorize('tcp', 50070, 50070, '0.0.0.0/0') - master_group.authorize('tcp', 60070, 60070, '0.0.0.0/0') master_group.authorize('tcp', 38090, 38090, '0.0.0.0/0') + if opts.ganglia: + master_group.authorize('tcp', 80, 80, '0.0.0.0/0') if slave_group.rules == []: # Group was just now created slave_group.authorize(src_group=master_group) slave_group.authorize(src_group=slave_group) slave_group.authorize(src_group=zoo_group) slave_group.authorize('tcp', 22, 22, '0.0.0.0/0') slave_group.authorize('tcp', 8080, 8081, '0.0.0.0/0') - if opts.cluster_type == "mesos": - slave_group.authorize('tcp', 50060, 50060, '0.0.0.0/0') - slave_group.authorize('tcp', 50075, 50075, '0.0.0.0/0') - slave_group.authorize('tcp', 60060, 60060, '0.0.0.0/0') - slave_group.authorize('tcp', 60075, 60075, '0.0.0.0/0') + slave_group.authorize('tcp', 50060, 50060, '0.0.0.0/0') + slave_group.authorize('tcp', 50075, 50075, '0.0.0.0/0') + slave_group.authorize('tcp', 60060, 60060, '0.0.0.0/0') + slave_group.authorize('tcp', 60075, 60075, '0.0.0.0/0') if zoo_group.rules == []: # Group was just now created zoo_group.authorize(src_group=master_group) zoo_group.authorize(src_group=slave_group) @@ -358,19 +365,38 @@ def get_existing_cluster(conn, opts, cluster_name, die_on_error=True): # Deploy configuration files and run setup scripts on a newly launched # or started EC2 cluster. def setup_cluster(conn, master_nodes, slave_nodes, zoo_nodes, opts, deploy_ssh_key): - print "Deploying files to master..." - deploy_files(conn, "deploy.generic", opts, master_nodes, slave_nodes, zoo_nodes) master = master_nodes[0].public_dns_name if deploy_ssh_key: print "Copying SSH key %s to master..." % opts.identity_file ssh(master, opts, 'mkdir -p ~/.ssh') scp(master, opts, opts.identity_file, '~/.ssh/id_rsa') ssh(master, opts, 'chmod 600 ~/.ssh/id_rsa') - print "Running setup on master..." + if opts.cluster_type == "mesos": - setup_mesos_cluster(master, opts) + modules = ['ephemeral-hdfs', 'persistent-hdfs', 'mesos'] elif opts.cluster_type == "standalone": - setup_standalone_cluster(master, slave_nodes, opts) + modules = ['ephemeral-hdfs', 'persistent-hdfs', 'spark-standalone'] + + if opts.ganglia: + modules.append('ganglia') + + if not opts.mesos_scripts: + # NOTE: We should clone the repository before running deploy_files to + # prevent ec2-variables.sh from being overwritten + ssh(master, opts, "rm -rf spark-ec2 && git clone https://github.com/shivaram/spark-ec2.git") + + print "Deploying files to master..." + deploy_files(conn, "deploy.generic", opts, master_nodes, slave_nodes, + zoo_nodes, modules) + + print "Running setup on master..." + if opts.mesos_scripts: + if opts.cluster_type == "mesos": + setup_mesos_cluster(master, opts) + elif opts.cluster_type == "standalone": + setup_standalone_cluster(master, slave_nodes, opts) + else: + setup_spark_cluster(master, opts) print "Done!" def setup_mesos_cluster(master, opts): @@ -383,6 +409,10 @@ def setup_standalone_cluster(master, slave_nodes, opts): ssh(master, opts, "echo \"%s\" > spark/conf/slaves" % (slave_ips)) ssh(master, opts, "/root/spark/bin/start-all.sh") +def setup_spark_cluster(master, opts): + ssh(master, opts, "chmod u+x spark-ec2/setup.sh") + ssh(master, opts, "spark-ec2/setup.sh") + # Wait for a whole cluster (masters, slaves and ZooKeeper) to start up def wait_for_cluster(conn, wait_secs, master_nodes, slave_nodes, zoo_nodes): @@ -427,7 +457,8 @@ def get_num_disks(instance_type): # cluster (e.g. lists of masters and slaves). Files are only deployed to # the first master instance in the cluster, and we expect the setup # script to be run on that instance to copy them to other nodes. -def deploy_files(conn, root_dir, opts, master_nodes, slave_nodes, zoo_nodes): +def deploy_files(conn, root_dir, opts, master_nodes, slave_nodes, zoo_nodes, + modules): active_master = master_nodes[0].public_dns_name num_disks = get_num_disks(opts.instance_type) @@ -459,7 +490,9 @@ def deploy_files(conn, root_dir, opts, master_nodes, slave_nodes, zoo_nodes): "cluster_url": cluster_url, "hdfs_data_dirs": hdfs_data_dirs, "mapred_local_dirs": mapred_local_dirs, - "spark_local_dirs": spark_local_dirs + "spark_local_dirs": spark_local_dirs, + "swap": str(opts.swap), + "modules": '\n'.join(modules) } # Create a temp directory in which we will place all the files to be |