aboutsummaryrefslogtreecommitdiff
path: root/ec2
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2013-06-26 11:15:48 -0700
committerPatrick Wendell <pwendell@gmail.com>2013-06-26 11:36:39 -0700
commit13e3dd98d86a89f9b2d2e6c9a8f03bd0b79fba70 (patch)
tree5ecdd8d6a42d6b258a26b64af6453951f8027f1d /ec2
parentd5f74aa9b7d806e2d189f20dcf1b619653507650 (diff)
downloadspark-13e3dd98d86a89f9b2d2e6c9a8f03bd0b79fba70.tar.gz
spark-13e3dd98d86a89f9b2d2e6c9a8f03bd0b79fba70.tar.bz2
spark-13e3dd98d86a89f9b2d2e6c9a8f03bd0b79fba70.zip
Removing mesos support
Diffstat (limited to 'ec2')
-rw-r--r--ec2/deploy.generic/root/mesos-ec2/ec2-variables.sh9
-rwxr-xr-xec2/spark_ec2.py130
2 files changed, 30 insertions, 109 deletions
diff --git a/ec2/deploy.generic/root/mesos-ec2/ec2-variables.sh b/ec2/deploy.generic/root/mesos-ec2/ec2-variables.sh
deleted file mode 100644
index 50ecf83404..0000000000
--- a/ec2/deploy.generic/root/mesos-ec2/ec2-variables.sh
+++ /dev/null
@@ -1,9 +0,0 @@
-#!/bin/bash
-
-# These variables are automatically filled in by the mesos-ec2 script.
-export MESOS_MASTERS="{{master_list}}"
-export MESOS_SLAVES="{{slave_list}}"
-export MESOS_ZOO_LIST="{{zoo_list}}"
-export MESOS_HDFS_DATA_DIRS="{{hdfs_data_dirs}}"
-export MESOS_MAPRED_LOCAL_DIRS="{{mapred_local_dirs}}"
-export MESOS_SPARK_LOCAL_DIRS="{{spark_local_dirs}}"
diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py
index 0b4dc2077e..d8890f26f4 100755
--- a/ec2/spark_ec2.py
+++ b/ec2/spark_ec2.py
@@ -94,17 +94,11 @@ def parse_args():
parser.add_option("--spot-price", metavar="PRICE", type="float",
help="If specified, launch slaves as spot instances with the given " +
"maximum price (in dollars)")
- parser.add_option("--cluster-type", type="choice", metavar="TYPE",
- choices=["mesos", "standalone"], default="standalone",
- help="'mesos' for a Mesos cluster, 'standalone' for a standalone " +
- "Spark cluster (default: standalone)")
parser.add_option("--ganglia", action="store_true", default=True,
help="Setup Ganglia monitoring on cluster (default: on). NOTE: " +
"the Ganglia page will be publicly accessible")
parser.add_option("--no-ganglia", action="store_false", dest="ganglia",
help="Disable Ganglia monitoring for the cluster")
- parser.add_option("--old-scripts", action="store_true", default=False,
- help="Use old mesos-ec2 scripts, for Spark <= 0.6 AMIs")
parser.add_option("-u", "--user", default="root",
help="The SSH user you want to connect as (default: root)")
parser.add_option("--delete-groups", action="store_true", default=False,
@@ -119,9 +113,6 @@ def parse_args():
print >> stderr, ("ERROR: The -i or --identity-file argument is " +
"required for " + action)
sys.exit(1)
- if opts.cluster_type not in ["mesos", "standalone"] and action == "launch":
- print >> stderr, ("ERROR: Invalid cluster type: " + opts.cluster_type)
- sys.exit(1)
# Boto config check
# http://boto.cloudhackers.com/en/latest/boto_config_tut.html
@@ -219,52 +210,38 @@ def get_spark_ami(opts):
# Launch a cluster of the given name, by setting up its security groups,
# and then starting new instances in them.
-# Returns a tuple of EC2 reservation objects for the master, slave
-# and zookeeper instances (in that order).
+# Returns a tuple of EC2 reservation objects for the master and slaves
# Fails if there already instances running in the cluster's groups.
def launch_cluster(conn, opts, cluster_name):
print "Setting up security groups..."
master_group = get_or_make_group(conn, cluster_name + "-master")
slave_group = get_or_make_group(conn, cluster_name + "-slaves")
- zoo_group = get_or_make_group(conn, cluster_name + "-zoo")
if master_group.rules == []: # Group was just now created
master_group.authorize(src_group=master_group)
master_group.authorize(src_group=slave_group)
- master_group.authorize(src_group=zoo_group)
master_group.authorize('tcp', 22, 22, '0.0.0.0/0')
master_group.authorize('tcp', 8080, 8081, '0.0.0.0/0')
master_group.authorize('tcp', 50030, 50030, '0.0.0.0/0')
master_group.authorize('tcp', 50070, 50070, '0.0.0.0/0')
master_group.authorize('tcp', 60070, 60070, '0.0.0.0/0')
- if opts.cluster_type == "mesos":
- master_group.authorize('tcp', 38090, 38090, '0.0.0.0/0')
if opts.ganglia:
master_group.authorize('tcp', 5080, 5080, '0.0.0.0/0')
if slave_group.rules == []: # Group was just now created
slave_group.authorize(src_group=master_group)
slave_group.authorize(src_group=slave_group)
- slave_group.authorize(src_group=zoo_group)
slave_group.authorize('tcp', 22, 22, '0.0.0.0/0')
slave_group.authorize('tcp', 8080, 8081, '0.0.0.0/0')
slave_group.authorize('tcp', 50060, 50060, '0.0.0.0/0')
slave_group.authorize('tcp', 50075, 50075, '0.0.0.0/0')
slave_group.authorize('tcp', 60060, 60060, '0.0.0.0/0')
slave_group.authorize('tcp', 60075, 60075, '0.0.0.0/0')
- if zoo_group.rules == []: # Group was just now created
- zoo_group.authorize(src_group=master_group)
- zoo_group.authorize(src_group=slave_group)
- zoo_group.authorize(src_group=zoo_group)
- zoo_group.authorize('tcp', 22, 22, '0.0.0.0/0')
- zoo_group.authorize('tcp', 2181, 2181, '0.0.0.0/0')
- zoo_group.authorize('tcp', 2888, 2888, '0.0.0.0/0')
- zoo_group.authorize('tcp', 3888, 3888, '0.0.0.0/0')
# Check if instances are already running in our groups
active_nodes = get_existing_cluster(conn, opts, cluster_name,
die_on_error=False)
if any(active_nodes):
print >> stderr, ("ERROR: There are already instances running in " +
- "group %s, %s or %s" % (master_group.name, slave_group.name, zoo_group.name))
+ "group %s, %s or %s" % (master_group.name, slave_group.name))
sys.exit(1)
# Figure out Spark AMI
@@ -336,9 +313,9 @@ def launch_cluster(conn, opts, cluster_name):
print "Canceling spot instance requests"
conn.cancel_spot_instance_requests(my_req_ids)
# Log a warning if any of these requests actually launched instances:
- (master_nodes, slave_nodes, zoo_nodes) = get_existing_cluster(
+ (master_nodes, slave_nodes) = get_existing_cluster(
conn, opts, cluster_name, die_on_error=False)
- running = len(master_nodes) + len(slave_nodes) + len(zoo_nodes)
+ running = len(master_nodes) + len(slave_nodes)
if running:
print >> stderr, ("WARNING: %d instances are still running" % running)
sys.exit(0)
@@ -379,21 +356,17 @@ def launch_cluster(conn, opts, cluster_name):
master_nodes = master_res.instances
print "Launched master in %s, regid = %s" % (zone, master_res.id)
- zoo_nodes = []
-
# Return all the instances
- return (master_nodes, slave_nodes, zoo_nodes)
+ return (master_nodes, slave_nodes)
# Get the EC2 instances in an existing cluster if available.
-# Returns a tuple of lists of EC2 instance objects for the masters,
-# slaves and zookeeper nodes (in that order).
+# Returns a tuple of lists of EC2 instance objects for the masters and slaves
def get_existing_cluster(conn, opts, cluster_name, die_on_error=True):
print "Searching for existing cluster " + cluster_name + "..."
reservations = conn.get_all_instances()
master_nodes = []
slave_nodes = []
- zoo_nodes = []
for res in reservations:
active = [i for i in res.instances if is_active(i)]
if len(active) > 0:
@@ -402,13 +375,11 @@ def get_existing_cluster(conn, opts, cluster_name, die_on_error=True):
master_nodes += res.instances
elif group_names == [cluster_name + "-slaves"]:
slave_nodes += res.instances
- elif group_names == [cluster_name + "-zoo"]:
- zoo_nodes += res.instances
- if any((master_nodes, slave_nodes, zoo_nodes)):
- print ("Found %d master(s), %d slaves, %d ZooKeeper nodes" %
- (len(master_nodes), len(slave_nodes), len(zoo_nodes)))
+ if any((master_nodes, slave_nodes)):
+ print ("Found %d master(s), %d slaves" %
+ (len(master_nodes), len(slave_nodes)))
if (master_nodes != [] and slave_nodes != []) or not die_on_error:
- return (master_nodes, slave_nodes, zoo_nodes)
+ return (master_nodes, slave_nodes)
else:
if master_nodes == [] and slave_nodes != []:
print "ERROR: Could not find master in group " + cluster_name + "-master"
@@ -421,7 +392,7 @@ def get_existing_cluster(conn, opts, cluster_name, die_on_error=True):
# Deploy configuration files and run setup scripts on a newly launched
# or started EC2 cluster.
-def setup_cluster(conn, master_nodes, slave_nodes, zoo_nodes, opts, deploy_ssh_key):
+def setup_cluster(conn, master_nodes, slave_nodes, opts, deploy_ssh_key):
master = master_nodes[0].public_dns_name
if deploy_ssh_key:
print "Copying SSH key %s to master..." % opts.identity_file
@@ -429,12 +400,8 @@ def setup_cluster(conn, master_nodes, slave_nodes, zoo_nodes, opts, deploy_ssh_k
scp(master, opts, opts.identity_file, '~/.ssh/id_rsa')
ssh(master, opts, 'chmod 600 ~/.ssh/id_rsa')
- if opts.cluster_type == "mesos":
- modules = ['spark', 'shark', 'ephemeral-hdfs', 'persistent-hdfs',
- 'mapreduce', 'mesos']
- elif opts.cluster_type == "standalone":
- modules = ['spark', 'shark', 'ephemeral-hdfs', 'persistent-hdfs',
- 'mapreduce', 'spark-standalone']
+ modules = ['spark', 'shark', 'ephemeral-hdfs', 'persistent-hdfs',
+ 'mapreduce', 'spark-standalone']
if opts.ganglia:
modules.append('ganglia')
@@ -446,18 +413,12 @@ def setup_cluster(conn, master_nodes, slave_nodes, zoo_nodes, opts, deploy_ssh_k
ssh(master, opts, "rm -rf spark-ec2 && git clone https://github.com/pwendell/spark-ec2.git -b ec2-updates")
print "Deploying files to master..."
- deploy_files(conn, "deploy.generic", opts, master_nodes, slave_nodes,
- zoo_nodes, modules)
+ deploy_files(conn, "deploy.generic", opts, master_nodes, slave_nodes, modules)
print "Running setup on master..."
setup_spark_cluster(master, opts)
print "Done!"
-def setup_mesos_cluster(master, opts):
- ssh(master, opts, "chmod u+x mesos-ec2/setup")
- ssh(master, opts, "mesos-ec2/setup %s %s %s %s" %
- ("generic", "none", "master", opts.swap))
-
def setup_standalone_cluster(master, slave_nodes, opts):
slave_ips = '\n'.join([i.public_dns_name for i in slave_nodes])
ssh(master, opts, "echo \"%s\" > spark/conf/slaves" % (slave_ips))
@@ -466,23 +427,18 @@ def setup_standalone_cluster(master, slave_nodes, opts):
def setup_spark_cluster(master, opts):
ssh(master, opts, "chmod u+x spark-ec2/setup.sh")
ssh(master, opts, "spark-ec2/setup.sh")
- if opts.cluster_type == "mesos":
- print "Mesos cluster started at http://%s:8080" % master
- elif opts.cluster_type == "standalone":
- print "Spark standalone cluster started at http://%s:8080" % master
+ print "Spark standalone cluster started at http://%s:8080" % master
if opts.ganglia:
print "Ganglia started at http://%s:5080/ganglia" % master
# Wait for a whole cluster (masters, slaves and ZooKeeper) to start up
-def wait_for_cluster(conn, wait_secs, master_nodes, slave_nodes, zoo_nodes):
+def wait_for_cluster(conn, wait_secs, master_nodes, slave_nodes):
print "Waiting for instances to start up..."
time.sleep(5)
wait_for_instances(conn, master_nodes)
wait_for_instances(conn, slave_nodes)
- if zoo_nodes != []:
- wait_for_instances(conn, zoo_nodes)
print "Waiting %d more seconds..." % wait_secs
time.sleep(wait_secs)
@@ -523,8 +479,7 @@ def get_num_disks(instance_type):
# cluster (e.g. lists of masters and slaves). Files are only deployed to
# the first master instance in the cluster, and we expect the setup
# script to be run on that instance to copy them to other nodes.
-def deploy_files(conn, root_dir, opts, master_nodes, slave_nodes, zoo_nodes,
- modules):
+def deploy_files(conn, root_dir, opts, master_nodes, slave_nodes, modules):
active_master = master_nodes[0].public_dns_name
num_disks = get_num_disks(opts.instance_type)
@@ -537,16 +492,7 @@ def deploy_files(conn, root_dir, opts, master_nodes, slave_nodes, zoo_nodes,
mapred_local_dirs += ",/mnt%d/hadoop/mrlocal" % i
spark_local_dirs += ",/mnt%d/spark" % i
- if zoo_nodes != []:
- zoo_list = '\n'.join([i.public_dns_name for i in zoo_nodes])
- cluster_url = "zoo://" + ",".join(
- ["%s:2181/mesos" % i.public_dns_name for i in zoo_nodes])
- elif opts.cluster_type == "mesos":
- zoo_list = "NONE"
- cluster_url = "%s:5050" % active_master
- elif opts.cluster_type == "standalone":
- zoo_list = "NONE"
- cluster_url = "%s:7077" % active_master
+ cluster_url = "%s:7077" % active_master
if "." in opts.spark_version:
(spark_v, shark_v) = get_spark_shark_version(opts)
@@ -558,7 +504,6 @@ def deploy_files(conn, root_dir, opts, master_nodes, slave_nodes, zoo_nodes,
"master_list": '\n'.join([i.public_dns_name for i in master_nodes]),
"active_master": active_master,
"slave_list": '\n'.join([i.public_dns_name for i in slave_nodes]),
- "zoo_list": zoo_list,
"cluster_url": cluster_url,
"hdfs_data_dirs": hdfs_data_dirs,
"mapred_local_dirs": mapred_local_dirs,
@@ -656,20 +601,20 @@ def main():
if action == "launch":
if opts.resume:
- (master_nodes, slave_nodes, zoo_nodes) = get_existing_cluster(
+ (master_nodes, slave_nodes) = get_existing_cluster(
conn, opts, cluster_name)
else:
- (master_nodes, slave_nodes, zoo_nodes) = launch_cluster(
+ (master_nodes, slave_nodes) = launch_cluster(
conn, opts, cluster_name)
- wait_for_cluster(conn, opts.wait, master_nodes, slave_nodes, zoo_nodes)
- setup_cluster(conn, master_nodes, slave_nodes, zoo_nodes, opts, True)
+ wait_for_cluster(conn, opts.wait, master_nodes, slave_nodes)
+ setup_cluster(conn, master_nodes, slave_nodes, opts, True)
elif action == "destroy":
response = raw_input("Are you sure you want to destroy the cluster " +
cluster_name + "?\nALL DATA ON ALL NODES WILL BE LOST!!\n" +
"Destroy cluster " + cluster_name + " (y/N): ")
if response == "y":
- (master_nodes, slave_nodes, zoo_nodes) = get_existing_cluster(
+ (master_nodes, slave_nodes) = get_existing_cluster(
conn, opts, cluster_name, die_on_error=False)
print "Terminating master..."
for inst in master_nodes:
@@ -677,15 +622,11 @@ def main():
print "Terminating slaves..."
for inst in slave_nodes:
inst.terminate()
- if zoo_nodes != []:
- print "Terminating zoo..."
- for inst in zoo_nodes:
- inst.terminate()
# Delete security groups as well
if opts.delete_groups:
print "Deleting security groups (this will take some time)..."
- group_names = [cluster_name + "-master", cluster_name + "-slaves", cluster_name + "-zoo"]
+ group_names = [cluster_name + "-master", cluster_name + "-slaves"]
attempt = 1;
while attempt <= 3:
@@ -725,7 +666,7 @@ def main():
print "Try re-running in a few minutes."
elif action == "login":
- (master_nodes, slave_nodes, zoo_nodes) = get_existing_cluster(
+ (master_nodes, slave_nodes) = get_existing_cluster(
conn, opts, cluster_name)
master = master_nodes[0].public_dns_name
print "Logging into master " + master + "..."
@@ -736,7 +677,7 @@ def main():
(opts.identity_file, proxy_opt, opts.user, master), shell=True)
elif action == "get-master":
- (master_nodes, slave_nodes, zoo_nodes) = get_existing_cluster(conn, opts, cluster_name)
+ (master_nodes, slave_nodes) = get_existing_cluster(conn, opts, cluster_name)
print master_nodes[0].public_dns_name
elif action == "stop":
@@ -746,7 +687,7 @@ def main():
"AMAZON EBS IF IT IS EBS-BACKED!!\n" +
"Stop cluster " + cluster_name + " (y/N): ")
if response == "y":
- (master_nodes, slave_nodes, zoo_nodes) = get_existing_cluster(
+ (master_nodes, slave_nodes) = get_existing_cluster(
conn, opts, cluster_name, die_on_error=False)
print "Stopping master..."
for inst in master_nodes:
@@ -756,15 +697,9 @@ def main():
for inst in slave_nodes:
if inst.state not in ["shutting-down", "terminated"]:
inst.stop()
- if zoo_nodes != []:
- print "Stopping zoo..."
- for inst in zoo_nodes:
- if inst.state not in ["shutting-down", "terminated"]:
- inst.stop()
elif action == "start":
- (master_nodes, slave_nodes, zoo_nodes) = get_existing_cluster(
- conn, opts, cluster_name)
+ (master_nodes, slave_nodes) = get_existing_cluster(conn, opts, cluster_name)
print "Starting slaves..."
for inst in slave_nodes:
if inst.state not in ["shutting-down", "terminated"]:
@@ -773,13 +708,8 @@ def main():
for inst in master_nodes:
if inst.state not in ["shutting-down", "terminated"]:
inst.start()
- if zoo_nodes != []:
- print "Starting zoo..."
- for inst in zoo_nodes:
- if inst.state not in ["shutting-down", "terminated"]:
- inst.start()
- wait_for_cluster(conn, opts.wait, master_nodes, slave_nodes, zoo_nodes)
- setup_cluster(conn, master_nodes, slave_nodes, zoo_nodes, opts, False)
+ wait_for_cluster(conn, opts.wait, master_nodes, slave_nodes)
+ setup_cluster(conn, master_nodes, slave_nodes, opts, False)
else:
print >> stderr, "Invalid action: %s" % action