aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--docs/ec2-scripts.md32
-rw-r--r--ec2/deploy.generic/root/mesos-ec2/ec2-variables.sh9
-rw-r--r--ec2/deploy.generic/root/spark-ec2/ec2-variables.sh16
-rwxr-xr-xec2/spark_ec2.py248
4 files changed, 145 insertions, 160 deletions
diff --git a/docs/ec2-scripts.md b/docs/ec2-scripts.md
index eab8a0ff20..bd787e0e46 100644
--- a/docs/ec2-scripts.md
+++ b/docs/ec2-scripts.md
@@ -4,10 +4,11 @@ title: Running Spark on EC2
---
The `spark-ec2` script, located in Spark's `ec2` directory, allows you
-to launch, manage and shut down Spark clusters on Amazon EC2. It automatically sets up Mesos, Spark and HDFS
-on the cluster for you.
-This guide describes how to use `spark-ec2` to launch clusters, how to run jobs on them, and how to shut them down.
-It assumes you've already signed up for an EC2 account on the [Amazon Web Services site](http://aws.amazon.com/).
+to launch, manage and shut down Spark clusters on Amazon EC2. It automatically
+sets up Spark, Shark and HDFS on the cluster for you. This guide describes
+how to use `spark-ec2` to launch clusters, how to run jobs on them, and how
+to shut them down. It assumes you've already signed up for an EC2 account
+on the [Amazon Web Services site](http://aws.amazon.com/).
`spark-ec2` is designed to manage multiple named clusters. You can
launch a new cluster (telling the script its size and giving it a name),
@@ -59,18 +60,22 @@ RAM). Refer to the Amazon pages about [EC2 instance
types](http://aws.amazon.com/ec2/instance-types) and [EC2
pricing](http://aws.amazon.com/ec2/#pricing) for information about other
instance types.
+- `--region=<EC2_REGION>` specifies an EC2 region in which to launch
+instances. The default region is `us-east-1`.
- `--zone=<EC2_ZONE>` can be used to specify an EC2 availability zone
to launch instances in. Sometimes, you will get an error because there
is not enough capacity in one zone, and you should try to launch in
-another. This happens mostly with the `m1.large` instance types;
-extra-large (both `m1.xlarge` and `c1.xlarge`) instances tend to be more
-available.
+another.
- `--ebs-vol-size=GB` will attach an EBS volume with a given amount
of space to each node so that you can have a persistent HDFS cluster
on your nodes across cluster restarts (see below).
- `--spot-price=PRICE` will launch the worker nodes as
[Spot Instances](http://aws.amazon.com/ec2/spot-instances/),
bidding for the given maximum price (in dollars).
+- `--spark-version=VERSION` will pre-load the cluster with the
+ specified version of Spark. VERSION can be a version number
+ (e.g. "0.7.2") or a specific git hash. By default, a recent
+ version will be used.
- If one of your launches fails due to e.g. not having the right
permissions on your private key file, you can run `launch` with the
`--resume` option to restart the setup process on an existing cluster.
@@ -99,9 +104,8 @@ permissions on your private key file, you can run `launch` with the
`spark-ec2` to attach a persistent EBS volume to each node for
storing the persistent HDFS.
- Finally, if you get errors while running your jobs, look at the slave's logs
- for that job inside of the Mesos work directory (/mnt/mesos-work). You can
- also view the status of the cluster using the Mesos web UI
- (`http://<master-hostname>:8080`).
+ for that job inside of the scheduler work directory (/root/spark/work). You can
+ also view the status of the cluster using the web UI: `http://<master-hostname>:8080`.
# Configuration
@@ -140,22 +144,14 @@ section.
# Limitations
-- `spark-ec2` currently only launches machines in the US-East region of EC2.
- It should not be hard to make it launch VMs in other zones, but you will need
- to create your own AMIs in them.
- Support for "cluster compute" nodes is limited -- there's no way to specify a
locality group. However, you can launch slave nodes in your
`<clusterName>-slaves` group manually and then use `spark-ec2 launch
--resume` to start a cluster with them.
-- Support for spot instances is limited.
If you have a patch or suggestion for one of these limitations, feel free to
[contribute](contributing-to-spark.html) it!
-# Using a Newer Spark Version
-
-The Spark EC2 machine images may not come with the latest version of Spark. To use a newer version, you can run `git pull` to pull in `/root/spark` to pull in the latest version of Spark from `git`, and build it using `sbt/sbt compile`. You will also need to copy it to all the other nodes in the cluster using `~/spark-ec2/copy-dir /root/spark`.
-
# Accessing Data in S3
Spark's file interface allows it to process data in Amazon S3 using the same URI formats that are supported for Hadoop. You can specify a path in S3 as input through a URI of the form `s3n://<bucket>/path`. You will also need to set your Amazon security credentials, either by setting the environment variables `AWS_ACCESS_KEY_ID` and `AWS_SECRET_ACCESS_KEY` before your program or through `SparkContext.hadoopConfiguration`. Full instructions on S3 access using the Hadoop input libraries can be found on the [Hadoop S3 page](http://wiki.apache.org/hadoop/AmazonS3).
diff --git a/ec2/deploy.generic/root/mesos-ec2/ec2-variables.sh b/ec2/deploy.generic/root/mesos-ec2/ec2-variables.sh
deleted file mode 100644
index ede6c78428..0000000000
--- a/ec2/deploy.generic/root/mesos-ec2/ec2-variables.sh
+++ /dev/null
@@ -1,9 +0,0 @@
-#!/usr/bin/env bash
-
-# These variables are automatically filled in by the mesos-ec2 script.
-export MESOS_MASTERS="{{master_list}}"
-export MESOS_SLAVES="{{slave_list}}"
-export MESOS_ZOO_LIST="{{zoo_list}}"
-export MESOS_HDFS_DATA_DIRS="{{hdfs_data_dirs}}"
-export MESOS_MAPRED_LOCAL_DIRS="{{mapred_local_dirs}}"
-export MESOS_SPARK_LOCAL_DIRS="{{spark_local_dirs}}"
diff --git a/ec2/deploy.generic/root/spark-ec2/ec2-variables.sh b/ec2/deploy.generic/root/spark-ec2/ec2-variables.sh
index 685ed8be8c..675429c57e 100644
--- a/ec2/deploy.generic/root/spark-ec2/ec2-variables.sh
+++ b/ec2/deploy.generic/root/spark-ec2/ec2-variables.sh
@@ -1,11 +1,13 @@
#!/usr/bin/env bash
-# These variables are automatically filled in by the mesos-ec2 script.
-export MESOS_MASTERS="{{master_list}}"
-export MESOS_SLAVES="{{slave_list}}"
-export MESOS_ZOO_LIST="{{zoo_list}}"
-export MESOS_HDFS_DATA_DIRS="{{hdfs_data_dirs}}"
-export MESOS_MAPRED_LOCAL_DIRS="{{mapred_local_dirs}}"
-export MESOS_SPARK_LOCAL_DIRS="{{spark_local_dirs}}"
+# These variables are automatically filled in by the spark-ec2 script.
+export MASTERS="{{master_list}}"
+export SLAVES="{{slave_list}}"
+export HDFS_DATA_DIRS="{{hdfs_data_dirs}}"
+export MAPRED_LOCAL_DIRS="{{mapred_local_dirs}}"
+export SPARK_LOCAL_DIRS="{{spark_local_dirs}}"
export MODULES="{{modules}}"
+export SPARK_VERSION="{{spark_version}}"
+export SHARK_VERSION="{{shark_version}}"
+export HADOOP_MAJOR_VERSION="{{hadoop_major_version}}"
export SWAP_MB="{{swap}}"
diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py
index 2ec3c007fb..4d49c60703 100755
--- a/ec2/spark_ec2.py
+++ b/ec2/spark_ec2.py
@@ -36,9 +36,8 @@ import boto
from boto.ec2.blockdevicemapping import BlockDeviceMapping, EBSBlockDeviceType
from boto import ec2
-# A static URL from which to figure out the latest Mesos EC2 AMI
-LATEST_AMI_URL = "https://s3.amazonaws.com/mesos-images/ids/latest-spark-0.7"
-
+# A URL prefix from which to fetch AMI information
+AMI_PREFIX = "https://raw.github.com/pwendell/spark-ec2/ec2-updates/ami-list"
# Configure and parse our command-line arguments
def parse_args():
@@ -66,9 +65,17 @@ def parse_args():
help="Availability zone to launch instances in, or 'all' to spread " +
"slaves across multiple (an additional $0.01/Gb for bandwidth" +
"between zones applies)")
- parser.add_option("-a", "--ami", default="latest",
- help="Amazon Machine Image ID to use, or 'latest' to use latest " +
- "available AMI (default: latest)")
+ parser.add_option("-a", "--ami",
+ help="Amazon Machine Image ID to use")
+
+ parser.add_option("-v", "--spark-version", default="0.7.2",
+ help="Version of Spark to use: 'X.Y.Z' or a specific git hash")
+ parser.add_option("--spark-git-repo",
+ default="https://github.com/mesos/spark",
+ help="Github repo from which to checkout supplied commit hash")
+ parser.add_option("--hadoop-major-version", default="2",
+ help="Major version of Hadoop (default: 2)")
+
parser.add_option("-D", metavar="[ADDRESS:]PORT", dest="proxy_port",
help="Use SSH dynamic port forwarding to create a SOCKS proxy at " +
"the given local address (for use with login)")
@@ -84,17 +91,11 @@ def parse_args():
parser.add_option("--spot-price", metavar="PRICE", type="float",
help="If specified, launch slaves as spot instances with the given " +
"maximum price (in dollars)")
- parser.add_option("--cluster-type", type="choice", metavar="TYPE",
- choices=["mesos", "standalone"], default="standalone",
- help="'mesos' for a Mesos cluster, 'standalone' for a standalone " +
- "Spark cluster (default: standalone)")
parser.add_option("--ganglia", action="store_true", default=True,
help="Setup Ganglia monitoring on cluster (default: on). NOTE: " +
"the Ganglia page will be publicly accessible")
parser.add_option("--no-ganglia", action="store_false", dest="ganglia",
help="Disable Ganglia monitoring for the cluster")
- parser.add_option("--old-scripts", action="store_true", default=False,
- help="Use old mesos-ec2 scripts, for Spark <= 0.6 AMIs")
parser.add_option("-u", "--user", default="root",
help="The SSH user you want to connect as (default: root)")
parser.add_option("--delete-groups", action="store_true", default=False,
@@ -109,9 +110,6 @@ def parse_args():
print >> stderr, ("ERROR: The -i or --identity-file argument is " +
"required for " + action)
sys.exit(1)
- if opts.cluster_type not in ["mesos", "standalone"] and action == "launch":
- print >> stderr, ("ERROR: Invalid cluster type: " + opts.cluster_type)
- sys.exit(1)
# Boto config check
# http://boto.cloudhackers.com/en/latest/boto_config_tut.html
@@ -158,66 +156,94 @@ def wait_for_instances(conn, instances):
def is_active(instance):
return (instance.state in ['pending', 'running', 'stopping', 'stopped'])
+# Return correct versions of Spark and Shark, given the supplied Spark version
+def get_spark_shark_version(opts):
+ spark_shark_map = {"0.7.2": "0.7.0"}
+ version = opts.spark_version.replace("v", "")
+ if version not in spark_shark_map:
+ print >> stderr, "Don't know about Spark version: %s" % version
+ sys.exit(1)
+ return (version, spark_shark_map[version])
+
+# Attempt to resolve an appropriate AMI given the architecture and
+# region of the request.
+def get_spark_ami(opts):
+ instance_types = {
+ "m1.small": "pvm",
+ "m1.medium": "pvm",
+ "m1.large": "pvm",
+ "m1.xlarge": "pvm",
+ "t1.micro": "pvm",
+ "c1.medium": "pvm",
+ "c1.xlarge": "pvm",
+ "m2.xlarge": "pvm",
+ "m2.2xlarge": "pvm",
+ "m2.4xlarge": "pvm",
+ "cc1.4xlarge": "hvm",
+ "cc2.8xlarge": "hvm",
+ "cg1.4xlarge": "hvm",
+ "hs1.8xlarge": "hvm",
+ "hi1.4xlarge": "hvm",
+ "m3.xlarge": "hvm",
+ "m3.2xlarge": "hvm",
+ "cr1.8xlarge": "hvm"
+ }
+ if opts.instance_type in instance_types:
+ instance_type = instance_types[opts.instance_type]
+ else:
+ instance_type = "pvm"
+ print >> stderr,\
+ "Don't recognize %s, assuming type is pvm" % opts.instance_type
+
+ ami_path = "%s/%s/%s" % (AMI_PREFIX, opts.region, instance_type)
+ try:
+ ami = urllib2.urlopen(ami_path).read().strip()
+ print "Spark AMI: " + ami
+ except:
+ print >> stderr, "Could not resolve AMI at: " + ami_path
+ sys.exit(1)
+
+ return ami
# Launch a cluster of the given name, by setting up its security groups,
# and then starting new instances in them.
-# Returns a tuple of EC2 reservation objects for the master, slave
-# and zookeeper instances (in that order).
+# Returns a tuple of EC2 reservation objects for the master and slaves
# Fails if there already instances running in the cluster's groups.
def launch_cluster(conn, opts, cluster_name):
print "Setting up security groups..."
master_group = get_or_make_group(conn, cluster_name + "-master")
slave_group = get_or_make_group(conn, cluster_name + "-slaves")
- zoo_group = get_or_make_group(conn, cluster_name + "-zoo")
if master_group.rules == []: # Group was just now created
master_group.authorize(src_group=master_group)
master_group.authorize(src_group=slave_group)
- master_group.authorize(src_group=zoo_group)
master_group.authorize('tcp', 22, 22, '0.0.0.0/0')
master_group.authorize('tcp', 8080, 8081, '0.0.0.0/0')
master_group.authorize('tcp', 50030, 50030, '0.0.0.0/0')
master_group.authorize('tcp', 50070, 50070, '0.0.0.0/0')
master_group.authorize('tcp', 60070, 60070, '0.0.0.0/0')
- if opts.cluster_type == "mesos":
- master_group.authorize('tcp', 38090, 38090, '0.0.0.0/0')
if opts.ganglia:
master_group.authorize('tcp', 5080, 5080, '0.0.0.0/0')
if slave_group.rules == []: # Group was just now created
slave_group.authorize(src_group=master_group)
slave_group.authorize(src_group=slave_group)
- slave_group.authorize(src_group=zoo_group)
slave_group.authorize('tcp', 22, 22, '0.0.0.0/0')
slave_group.authorize('tcp', 8080, 8081, '0.0.0.0/0')
slave_group.authorize('tcp', 50060, 50060, '0.0.0.0/0')
slave_group.authorize('tcp', 50075, 50075, '0.0.0.0/0')
slave_group.authorize('tcp', 60060, 60060, '0.0.0.0/0')
slave_group.authorize('tcp', 60075, 60075, '0.0.0.0/0')
- if zoo_group.rules == []: # Group was just now created
- zoo_group.authorize(src_group=master_group)
- zoo_group.authorize(src_group=slave_group)
- zoo_group.authorize(src_group=zoo_group)
- zoo_group.authorize('tcp', 22, 22, '0.0.0.0/0')
- zoo_group.authorize('tcp', 2181, 2181, '0.0.0.0/0')
- zoo_group.authorize('tcp', 2888, 2888, '0.0.0.0/0')
- zoo_group.authorize('tcp', 3888, 3888, '0.0.0.0/0')
# Check if instances are already running in our groups
active_nodes = get_existing_cluster(conn, opts, cluster_name,
die_on_error=False)
if any(active_nodes):
print >> stderr, ("ERROR: There are already instances running in " +
- "group %s, %s or %s" % (master_group.name, slave_group.name, zoo_group.name))
+ "group %s or %s" % (master_group.name, slave_group.name))
sys.exit(1)
- # Figure out the latest AMI from our static URL
- if opts.ami == "latest":
- try:
- opts.ami = urllib2.urlopen(LATEST_AMI_URL).read().strip()
- print "Latest Spark AMI: " + opts.ami
- except:
- print >> stderr, "Could not read " + LATEST_AMI_URL
- sys.exit(1)
-
+ # Figure out Spark AMI
+ if opts.ami is None:
+ opts.ami = get_spark_ami(opts)
print "Launching instances..."
try:
@@ -284,9 +310,9 @@ def launch_cluster(conn, opts, cluster_name):
print "Canceling spot instance requests"
conn.cancel_spot_instance_requests(my_req_ids)
# Log a warning if any of these requests actually launched instances:
- (master_nodes, slave_nodes, zoo_nodes) = get_existing_cluster(
+ (master_nodes, slave_nodes) = get_existing_cluster(
conn, opts, cluster_name, die_on_error=False)
- running = len(master_nodes) + len(slave_nodes) + len(zoo_nodes)
+ running = len(master_nodes) + len(slave_nodes)
if running:
print >> stderr, ("WARNING: %d instances are still running" % running)
sys.exit(0)
@@ -327,21 +353,17 @@ def launch_cluster(conn, opts, cluster_name):
master_nodes = master_res.instances
print "Launched master in %s, regid = %s" % (zone, master_res.id)
- zoo_nodes = []
-
# Return all the instances
- return (master_nodes, slave_nodes, zoo_nodes)
+ return (master_nodes, slave_nodes)
# Get the EC2 instances in an existing cluster if available.
-# Returns a tuple of lists of EC2 instance objects for the masters,
-# slaves and zookeeper nodes (in that order).
+# Returns a tuple of lists of EC2 instance objects for the masters and slaves
def get_existing_cluster(conn, opts, cluster_name, die_on_error=True):
print "Searching for existing cluster " + cluster_name + "..."
reservations = conn.get_all_instances()
master_nodes = []
slave_nodes = []
- zoo_nodes = []
for res in reservations:
active = [i for i in res.instances if is_active(i)]
if len(active) > 0:
@@ -350,13 +372,11 @@ def get_existing_cluster(conn, opts, cluster_name, die_on_error=True):
master_nodes += res.instances
elif group_names == [cluster_name + "-slaves"]:
slave_nodes += res.instances
- elif group_names == [cluster_name + "-zoo"]:
- zoo_nodes += res.instances
- if any((master_nodes, slave_nodes, zoo_nodes)):
- print ("Found %d master(s), %d slaves, %d ZooKeeper nodes" %
- (len(master_nodes), len(slave_nodes), len(zoo_nodes)))
+ if any((master_nodes, slave_nodes)):
+ print ("Found %d master(s), %d slaves" %
+ (len(master_nodes), len(slave_nodes)))
if (master_nodes != [] and slave_nodes != []) or not die_on_error:
- return (master_nodes, slave_nodes, zoo_nodes)
+ return (master_nodes, slave_nodes)
else:
if master_nodes == [] and slave_nodes != []:
print "ERROR: Could not find master in group " + cluster_name + "-master"
@@ -369,7 +389,7 @@ def get_existing_cluster(conn, opts, cluster_name, die_on_error=True):
# Deploy configuration files and run setup scripts on a newly launched
# or started EC2 cluster.
-def setup_cluster(conn, master_nodes, slave_nodes, zoo_nodes, opts, deploy_ssh_key):
+def setup_cluster(conn, master_nodes, slave_nodes, opts, deploy_ssh_key):
master = master_nodes[0].public_dns_name
if deploy_ssh_key:
print "Copying SSH key %s to master..." % opts.identity_file
@@ -377,38 +397,28 @@ def setup_cluster(conn, master_nodes, slave_nodes, zoo_nodes, opts, deploy_ssh_k
scp(master, opts, opts.identity_file, '~/.ssh/id_rsa')
ssh(master, opts, 'chmod 600 ~/.ssh/id_rsa')
- if opts.cluster_type == "mesos":
- modules = ['ephemeral-hdfs', 'persistent-hdfs', 'mesos']
- elif opts.cluster_type == "standalone":
- modules = ['ephemeral-hdfs', 'persistent-hdfs', 'spark-standalone']
+ modules = ['spark', 'shark', 'ephemeral-hdfs', 'persistent-hdfs',
+ 'mapreduce', 'spark-standalone']
+
+ if opts.hadoop_major_version == "1":
+ modules = filter(lambda x: x != "mapreduce", modules)
if opts.ganglia:
modules.append('ganglia')
- if not opts.old_scripts:
- # NOTE: We should clone the repository before running deploy_files to
- # prevent ec2-variables.sh from being overwritten
- ssh(master, opts, "rm -rf spark-ec2 && git clone https://github.com/mesos/spark-ec2.git")
+ # NOTE: We should clone the repository before running deploy_files to
+ # prevent ec2-variables.sh from being overwritten
+ # TODO: Before being merged this should be replaced with the correct repo,
+ # and likely a new branch (to allow backwards compatibility).
+ ssh(master, opts, "rm -rf spark-ec2 && git clone https://github.com/pwendell/spark-ec2.git -b ec2-updates")
print "Deploying files to master..."
- deploy_files(conn, "deploy.generic", opts, master_nodes, slave_nodes,
- zoo_nodes, modules)
+ deploy_files(conn, "deploy.generic", opts, master_nodes, slave_nodes, modules)
print "Running setup on master..."
- if opts.old_scripts:
- if opts.cluster_type == "mesos":
- setup_mesos_cluster(master, opts)
- elif opts.cluster_type == "standalone":
- setup_standalone_cluster(master, slave_nodes, opts)
- else:
- setup_spark_cluster(master, opts)
+ setup_spark_cluster(master, opts)
print "Done!"
-def setup_mesos_cluster(master, opts):
- ssh(master, opts, "chmod u+x mesos-ec2/setup")
- ssh(master, opts, "mesos-ec2/setup %s %s %s %s" %
- ("generic", "none", "master", opts.swap))
-
def setup_standalone_cluster(master, slave_nodes, opts):
slave_ips = '\n'.join([i.public_dns_name for i in slave_nodes])
ssh(master, opts, "echo \"%s\" > spark/conf/slaves" % (slave_ips))
@@ -417,23 +427,18 @@ def setup_standalone_cluster(master, slave_nodes, opts):
def setup_spark_cluster(master, opts):
ssh(master, opts, "chmod u+x spark-ec2/setup.sh")
ssh(master, opts, "spark-ec2/setup.sh")
- if opts.cluster_type == "mesos":
- print "Mesos cluster started at http://%s:8080" % master
- elif opts.cluster_type == "standalone":
- print "Spark standalone cluster started at http://%s:8080" % master
+ print "Spark standalone cluster started at http://%s:8080" % master
if opts.ganglia:
print "Ganglia started at http://%s:5080/ganglia" % master
# Wait for a whole cluster (masters, slaves and ZooKeeper) to start up
-def wait_for_cluster(conn, wait_secs, master_nodes, slave_nodes, zoo_nodes):
+def wait_for_cluster(conn, wait_secs, master_nodes, slave_nodes):
print "Waiting for instances to start up..."
time.sleep(5)
wait_for_instances(conn, master_nodes)
wait_for_instances(conn, slave_nodes)
- if zoo_nodes != []:
- wait_for_instances(conn, zoo_nodes)
print "Waiting %d more seconds..." % wait_secs
time.sleep(wait_secs)
@@ -454,7 +459,12 @@ def get_num_disks(instance_type):
"m2.4xlarge": 2,
"cc1.4xlarge": 2,
"cc2.8xlarge": 4,
- "cg1.4xlarge": 2
+ "cg1.4xlarge": 2,
+ "hs1.8xlarge": 24,
+ "cr1.8xlarge": 2,
+ "hi1.4xlarge": 2,
+ "m3.xlarge": 0,
+ "m3.2xlarge": 0
}
if instance_type in disks_by_instance:
return disks_by_instance[instance_type]
@@ -469,8 +479,7 @@ def get_num_disks(instance_type):
# cluster (e.g. lists of masters and slaves). Files are only deployed to
# the first master instance in the cluster, and we expect the setup
# script to be run on that instance to copy them to other nodes.
-def deploy_files(conn, root_dir, opts, master_nodes, slave_nodes, zoo_nodes,
- modules):
+def deploy_files(conn, root_dir, opts, master_nodes, slave_nodes, modules):
active_master = master_nodes[0].public_dns_name
num_disks = get_num_disks(opts.instance_type)
@@ -483,28 +492,30 @@ def deploy_files(conn, root_dir, opts, master_nodes, slave_nodes, zoo_nodes,
mapred_local_dirs += ",/mnt%d/hadoop/mrlocal" % i
spark_local_dirs += ",/mnt%d/spark" % i
- if zoo_nodes != []:
- zoo_list = '\n'.join([i.public_dns_name for i in zoo_nodes])
- cluster_url = "zoo://" + ",".join(
- ["%s:2181/mesos" % i.public_dns_name for i in zoo_nodes])
- elif opts.cluster_type == "mesos":
- zoo_list = "NONE"
- cluster_url = "%s:5050" % active_master
- elif opts.cluster_type == "standalone":
- zoo_list = "NONE"
- cluster_url = "%s:7077" % active_master
+ cluster_url = "%s:7077" % active_master
+
+ if "." in opts.spark_version:
+ # Pre-built spark & shark deploy
+ (spark_v, shark_v) = get_spark_shark_version(opts)
+ else:
+ # Spark-only custom deploy
+ spark_v = "%s|%s" % (opts.spark_git_repo, opts.spark_version)
+ shark_v = ""
+ modules = filter(lambda x: x != "shark", modules)
template_vars = {
"master_list": '\n'.join([i.public_dns_name for i in master_nodes]),
"active_master": active_master,
"slave_list": '\n'.join([i.public_dns_name for i in slave_nodes]),
- "zoo_list": zoo_list,
"cluster_url": cluster_url,
"hdfs_data_dirs": hdfs_data_dirs,
"mapred_local_dirs": mapred_local_dirs,
"spark_local_dirs": spark_local_dirs,
"swap": str(opts.swap),
- "modules": '\n'.join(modules)
+ "modules": '\n'.join(modules),
+ "spark_version": spark_v,
+ "shark_version": shark_v,
+ "hadoop_major_version": opts.hadoop_major_version
}
# Create a temp directory in which we will place all the files to be
@@ -593,20 +604,20 @@ def main():
if action == "launch":
if opts.resume:
- (master_nodes, slave_nodes, zoo_nodes) = get_existing_cluster(
+ (master_nodes, slave_nodes) = get_existing_cluster(
conn, opts, cluster_name)
else:
- (master_nodes, slave_nodes, zoo_nodes) = launch_cluster(
+ (master_nodes, slave_nodes) = launch_cluster(
conn, opts, cluster_name)
- wait_for_cluster(conn, opts.wait, master_nodes, slave_nodes, zoo_nodes)
- setup_cluster(conn, master_nodes, slave_nodes, zoo_nodes, opts, True)
+ wait_for_cluster(conn, opts.wait, master_nodes, slave_nodes)
+ setup_cluster(conn, master_nodes, slave_nodes, opts, True)
elif action == "destroy":
response = raw_input("Are you sure you want to destroy the cluster " +
cluster_name + "?\nALL DATA ON ALL NODES WILL BE LOST!!\n" +
"Destroy cluster " + cluster_name + " (y/N): ")
if response == "y":
- (master_nodes, slave_nodes, zoo_nodes) = get_existing_cluster(
+ (master_nodes, slave_nodes) = get_existing_cluster(
conn, opts, cluster_name, die_on_error=False)
print "Terminating master..."
for inst in master_nodes:
@@ -614,15 +625,11 @@ def main():
print "Terminating slaves..."
for inst in slave_nodes:
inst.terminate()
- if zoo_nodes != []:
- print "Terminating zoo..."
- for inst in zoo_nodes:
- inst.terminate()
# Delete security groups as well
if opts.delete_groups:
print "Deleting security groups (this will take some time)..."
- group_names = [cluster_name + "-master", cluster_name + "-slaves", cluster_name + "-zoo"]
+ group_names = [cluster_name + "-master", cluster_name + "-slaves"]
attempt = 1;
while attempt <= 3:
@@ -662,7 +669,7 @@ def main():
print "Try re-running in a few minutes."
elif action == "login":
- (master_nodes, slave_nodes, zoo_nodes) = get_existing_cluster(
+ (master_nodes, slave_nodes) = get_existing_cluster(
conn, opts, cluster_name)
master = master_nodes[0].public_dns_name
print "Logging into master " + master + "..."
@@ -673,7 +680,7 @@ def main():
(opts.identity_file, proxy_opt, opts.user, master), shell=True)
elif action == "get-master":
- (master_nodes, slave_nodes, zoo_nodes) = get_existing_cluster(conn, opts, cluster_name)
+ (master_nodes, slave_nodes) = get_existing_cluster(conn, opts, cluster_name)
print master_nodes[0].public_dns_name
elif action == "stop":
@@ -683,7 +690,7 @@ def main():
"AMAZON EBS IF IT IS EBS-BACKED!!\n" +
"Stop cluster " + cluster_name + " (y/N): ")
if response == "y":
- (master_nodes, slave_nodes, zoo_nodes) = get_existing_cluster(
+ (master_nodes, slave_nodes) = get_existing_cluster(
conn, opts, cluster_name, die_on_error=False)
print "Stopping master..."
for inst in master_nodes:
@@ -693,15 +700,9 @@ def main():
for inst in slave_nodes:
if inst.state not in ["shutting-down", "terminated"]:
inst.stop()
- if zoo_nodes != []:
- print "Stopping zoo..."
- for inst in zoo_nodes:
- if inst.state not in ["shutting-down", "terminated"]:
- inst.stop()
elif action == "start":
- (master_nodes, slave_nodes, zoo_nodes) = get_existing_cluster(
- conn, opts, cluster_name)
+ (master_nodes, slave_nodes) = get_existing_cluster(conn, opts, cluster_name)
print "Starting slaves..."
for inst in slave_nodes:
if inst.state not in ["shutting-down", "terminated"]:
@@ -710,13 +711,8 @@ def main():
for inst in master_nodes:
if inst.state not in ["shutting-down", "terminated"]:
inst.start()
- if zoo_nodes != []:
- print "Starting zoo..."
- for inst in zoo_nodes:
- if inst.state not in ["shutting-down", "terminated"]:
- inst.start()
- wait_for_cluster(conn, opts.wait, master_nodes, slave_nodes, zoo_nodes)
- setup_cluster(conn, master_nodes, slave_nodes, zoo_nodes, opts, False)
+ wait_for_cluster(conn, opts.wait, master_nodes, slave_nodes)
+ setup_cluster(conn, master_nodes, slave_nodes, opts, False)
else:
print >> stderr, "Invalid action: %s" % action