aboutsummaryrefslogtreecommitdiff
path: root/ec2/spark_ec2.py
diff options
context:
space:
mode:
Diffstat (limited to 'ec2/spark_ec2.py')
-rwxr-xr-xec2/spark_ec2.py81
1 files changed, 62 insertions, 19 deletions
diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py
index a5384d3bda..66b1faf2cd 100755
--- a/ec2/spark_ec2.py
+++ b/ec2/spark_ec2.py
@@ -82,12 +82,21 @@ def parse_args():
parser.add_option("--spot-price", metavar="PRICE", type="float",
help="If specified, launch slaves as spot instances with the given " +
"maximum price (in dollars)")
- parser.add_option("-c", "--cluster-type", default="mesos",
- help="'mesos' for a mesos cluster, 'standalone' for a standalone spark cluster (default: mesos)")
+ parser.add_option("--cluster-type", type="choice", metavar="TYPE",
+ choices=["mesos", "standalone"], default="mesos",
+ help="'mesos' for a Mesos cluster, 'standalone' for a standalone " +
+ "Spark cluster (default: mesos)")
+ parser.add_option("--ganglia", action="store_true", default=True,
+ help="Setup Ganglia monitoring on cluster (default: on). NOTE: " +
+ "the Ganglia page will be publicly accessible")
+ parser.add_option("--no-ganglia", action="store_false", dest="ganglia",
+ help="Disable Ganglia monitoring for the cluster")
+ parser.add_option("--new-scripts", action="store_true", default=False,
+ help="Use new spark-ec2 scripts, for Spark >= 0.7 AMIs")
parser.add_option("-u", "--user", default="root",
- help="The ssh user you want to connect as (default: root)")
+ help="The SSH user you want to connect as (default: root)")
parser.add_option("--delete-groups", action="store_true", default=False,
- help="When destroying a cluster, also destroy the security groups that were created")
+ help="When destroying a cluster, delete the security groups that were created")
(opts, args) = parser.parse_args()
if len(args) != 2:
@@ -164,22 +173,23 @@ def launch_cluster(conn, opts, cluster_name):
master_group.authorize(src_group=zoo_group)
master_group.authorize('tcp', 22, 22, '0.0.0.0/0')
master_group.authorize('tcp', 8080, 8081, '0.0.0.0/0')
+ master_group.authorize('tcp', 50030, 50030, '0.0.0.0/0')
+ master_group.authorize('tcp', 50070, 50070, '0.0.0.0/0')
+ master_group.authorize('tcp', 60070, 60070, '0.0.0.0/0')
if opts.cluster_type == "mesos":
- master_group.authorize('tcp', 50030, 50030, '0.0.0.0/0')
- master_group.authorize('tcp', 50070, 50070, '0.0.0.0/0')
- master_group.authorize('tcp', 60070, 60070, '0.0.0.0/0')
master_group.authorize('tcp', 38090, 38090, '0.0.0.0/0')
+ if opts.ganglia:
+ master_group.authorize('tcp', 5080, 5080, '0.0.0.0/0')
if slave_group.rules == []: # Group was just now created
slave_group.authorize(src_group=master_group)
slave_group.authorize(src_group=slave_group)
slave_group.authorize(src_group=zoo_group)
slave_group.authorize('tcp', 22, 22, '0.0.0.0/0')
slave_group.authorize('tcp', 8080, 8081, '0.0.0.0/0')
- if opts.cluster_type == "mesos":
- slave_group.authorize('tcp', 50060, 50060, '0.0.0.0/0')
- slave_group.authorize('tcp', 50075, 50075, '0.0.0.0/0')
- slave_group.authorize('tcp', 60060, 60060, '0.0.0.0/0')
- slave_group.authorize('tcp', 60075, 60075, '0.0.0.0/0')
+ slave_group.authorize('tcp', 50060, 50060, '0.0.0.0/0')
+ slave_group.authorize('tcp', 50075, 50075, '0.0.0.0/0')
+ slave_group.authorize('tcp', 60060, 60060, '0.0.0.0/0')
+ slave_group.authorize('tcp', 60075, 60075, '0.0.0.0/0')
if zoo_group.rules == []: # Group was just now created
zoo_group.authorize(src_group=master_group)
zoo_group.authorize(src_group=slave_group)
@@ -358,19 +368,38 @@ def get_existing_cluster(conn, opts, cluster_name, die_on_error=True):
# Deploy configuration files and run setup scripts on a newly launched
# or started EC2 cluster.
def setup_cluster(conn, master_nodes, slave_nodes, zoo_nodes, opts, deploy_ssh_key):
- print "Deploying files to master..."
- deploy_files(conn, "deploy.generic", opts, master_nodes, slave_nodes, zoo_nodes)
master = master_nodes[0].public_dns_name
if deploy_ssh_key:
print "Copying SSH key %s to master..." % opts.identity_file
ssh(master, opts, 'mkdir -p ~/.ssh')
scp(master, opts, opts.identity_file, '~/.ssh/id_rsa')
ssh(master, opts, 'chmod 600 ~/.ssh/id_rsa')
- print "Running setup on master..."
+
if opts.cluster_type == "mesos":
- setup_mesos_cluster(master, opts)
+ modules = ['ephemeral-hdfs', 'persistent-hdfs', 'mesos']
elif opts.cluster_type == "standalone":
- setup_standalone_cluster(master, slave_nodes, opts)
+ modules = ['ephemeral-hdfs', 'persistent-hdfs', 'spark-standalone']
+
+ if opts.ganglia:
+ modules.append('ganglia')
+
+ if opts.new_scripts:
+ # NOTE: We should clone the repository before running deploy_files to
+ # prevent ec2-variables.sh from being overwritten
+ ssh(master, opts, "rm -rf spark-ec2 && git clone https://github.com/mesos/spark-ec2.git")
+
+ print "Deploying files to master..."
+ deploy_files(conn, "deploy.generic", opts, master_nodes, slave_nodes,
+ zoo_nodes, modules)
+
+ print "Running setup on master..."
+ if not opts.new_scripts:
+ if opts.cluster_type == "mesos":
+ setup_mesos_cluster(master, opts)
+ elif opts.cluster_type == "standalone":
+ setup_standalone_cluster(master, slave_nodes, opts)
+ else:
+ setup_spark_cluster(master, opts)
print "Done!"
def setup_mesos_cluster(master, opts):
@@ -383,6 +412,17 @@ def setup_standalone_cluster(master, slave_nodes, opts):
ssh(master, opts, "echo \"%s\" > spark/conf/slaves" % (slave_ips))
ssh(master, opts, "/root/spark/bin/start-all.sh")
+def setup_spark_cluster(master, opts):
+ ssh(master, opts, "chmod u+x spark-ec2/setup.sh")
+ ssh(master, opts, "spark-ec2/setup.sh")
+ if opts.cluster_type == "mesos":
+ print "Mesos cluster started at http://%s:8080" % master
+ elif opts.cluster_type == "standalone":
+ print "Spark standalone cluster started at http://%s:8080" % master
+
+ if opts.ganglia:
+ print "Ganglia started at http://%s:5080/ganglia" % master
+
# Wait for a whole cluster (masters, slaves and ZooKeeper) to start up
def wait_for_cluster(conn, wait_secs, master_nodes, slave_nodes, zoo_nodes):
@@ -427,7 +467,8 @@ def get_num_disks(instance_type):
# cluster (e.g. lists of masters and slaves). Files are only deployed to
# the first master instance in the cluster, and we expect the setup
# script to be run on that instance to copy them to other nodes.
-def deploy_files(conn, root_dir, opts, master_nodes, slave_nodes, zoo_nodes):
+def deploy_files(conn, root_dir, opts, master_nodes, slave_nodes, zoo_nodes,
+ modules):
active_master = master_nodes[0].public_dns_name
num_disks = get_num_disks(opts.instance_type)
@@ -459,7 +500,9 @@ def deploy_files(conn, root_dir, opts, master_nodes, slave_nodes, zoo_nodes):
"cluster_url": cluster_url,
"hdfs_data_dirs": hdfs_data_dirs,
"mapred_local_dirs": mapred_local_dirs,
- "spark_local_dirs": spark_local_dirs
+ "spark_local_dirs": spark_local_dirs,
+ "swap": str(opts.swap),
+ "modules": '\n'.join(modules)
}
# Create a temp directory in which we will place all the files to be