aboutsummaryrefslogtreecommitdiff
path: root/ec2
diff options
context:
space:
mode:
authorNicholas Chammas <nicholas.chammas@gmail.com>2015-01-08 17:42:08 -0800
committerAndrew Or <andrew@databricks.com>2015-01-08 17:42:08 -0800
commit167a5ab0bd1d37f3ac23bec49e484a238610cf75 (patch)
tree60715154e51b86d83dd460da1ee69c3cb488e666 /ec2
parent48cecf673c38ead56afa2dea49d295165c67cdf4 (diff)
downloadspark-167a5ab0bd1d37f3ac23bec49e484a238610cf75.tar.gz
spark-167a5ab0bd1d37f3ac23bec49e484a238610cf75.tar.bz2
spark-167a5ab0bd1d37f3ac23bec49e484a238610cf75.zip
[SPARK-5122] Remove Shark from spark-ec2
I moved the Spark-Shark version map [to the wiki](https://cwiki.apache.org/confluence/display/SPARK/Spark-Shark+version+mapping). This PR has a [matching PR in mesos/spark-ec2](https://github.com/mesos/spark-ec2/pull/89). Author: Nicholas Chammas <nicholas.chammas@gmail.com> Closes #3939 from nchammas/remove-shark and squashes the following commits: 66e0841 [Nicholas Chammas] fix style ceeab85 [Nicholas Chammas] show default Spark GitHub repo 7270126 [Nicholas Chammas] validate Spark hashes db4935d [Nicholas Chammas] validate spark version upfront fc0d5b9 [Nicholas Chammas] remove Shark
Diffstat (limited to 'ec2')
-rwxr-xr-xec2/spark_ec2.py78
1 files changed, 44 insertions, 34 deletions
diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py
index 485eea4f5e..abab209a05 100755
--- a/ec2/spark_ec2.py
+++ b/ec2/spark_ec2.py
@@ -39,10 +39,26 @@ from datetime import datetime
from optparse import OptionParser
from sys import stderr
+VALID_SPARK_VERSIONS = set([
+ "0.7.3",
+ "0.8.0",
+ "0.8.1",
+ "0.9.0",
+ "0.9.1",
+ "0.9.2",
+ "1.0.0",
+ "1.0.1",
+ "1.0.2",
+ "1.1.0",
+ "1.1.1",
+ "1.2.0",
+])
+
DEFAULT_SPARK_VERSION = "1.2.0"
+DEFAULT_SPARK_GITHUB_REPO = "https://github.com/apache/spark"
SPARK_EC2_DIR = os.path.dirname(os.path.realpath(__file__))
-
MESOS_SPARK_EC2_BRANCH = "branch-1.3"
+
# A URL prefix from which to fetch AMI information
AMI_PREFIX = "https://raw.github.com/mesos/spark-ec2/{b}/ami-list".format(b=MESOS_SPARK_EC2_BRANCH)
@@ -126,8 +142,8 @@ def parse_args():
help="Version of Spark to use: 'X.Y.Z' or a specific git hash (default: %default)")
parser.add_option(
"--spark-git-repo",
- default="https://github.com/apache/spark",
- help="Github repo from which to checkout supplied commit hash")
+ default=DEFAULT_SPARK_GITHUB_REPO,
+ help="Github repo from which to checkout supplied commit hash (default: %default)")
parser.add_option(
"--hadoop-major-version", default="1",
help="Major version of Hadoop (default: %default)")
@@ -236,6 +252,26 @@ def get_or_make_group(conn, name, vpc_id):
return conn.create_security_group(name, "Spark EC2 group", vpc_id)
+def get_validate_spark_version(version, repo):
+ if "." in version:
+ version = version.replace("v", "")
+ if version not in VALID_SPARK_VERSIONS:
+ print >> stderr, "Don't know about Spark version: {v}".format(v=version)
+ sys.exit(1)
+ return version
+ else:
+ github_commit_url = "{repo}/commit/{commit_hash}".format(repo=repo, commit_hash=version)
+ request = urllib2.Request(github_commit_url)
+ request.get_method = lambda: 'HEAD'
+ try:
+ response = urllib2.urlopen(request)
+ except urllib2.HTTPError, e:
+ print >> stderr, "Couldn't validate Spark commit: {url}".format(url=github_commit_url)
+ print >> stderr, "Received HTTP response code of {code}.".format(code=e.code)
+ sys.exit(1)
+ return version
+
+
# Check whether a given EC2 instance object is in a state we consider active,
# i.e. not terminating or terminated. We count both stopping and stopped as
# active since we can restart stopped clusters.
@@ -243,29 +279,6 @@ def is_active(instance):
return (instance.state in ['pending', 'running', 'stopping', 'stopped'])
-# Return correct versions of Spark and Shark, given the supplied Spark version
-def get_spark_shark_version(opts):
- spark_shark_map = {
- "0.7.3": "0.7.1",
- "0.8.0": "0.8.0",
- "0.8.1": "0.8.1",
- "0.9.0": "0.9.0",
- "0.9.1": "0.9.1",
- # These are dummy versions (no Shark versions after this)
- "1.0.0": "1.0.0",
- "1.0.1": "1.0.1",
- "1.0.2": "1.0.2",
- "1.1.0": "1.1.0",
- "1.1.1": "1.1.1",
- "1.2.0": "1.2.0",
- }
- version = opts.spark_version.replace("v", "")
- if version not in spark_shark_map:
- print >> stderr, "Don't know about Spark version: %s" % version
- sys.exit(1)
- return (version, spark_shark_map[version])
-
-
# Attempt to resolve an appropriate AMI given the architecture and region of the request.
# Source: http://aws.amazon.com/amazon-linux-ami/instance-type-matrix/
# Last Updated: 2014-06-20
@@ -619,7 +632,7 @@ def setup_cluster(conn, master_nodes, slave_nodes, opts, deploy_ssh_key):
print slave.public_dns_name
ssh_write(slave.public_dns_name, opts, ['tar', 'x'], dot_ssh_tar)
- modules = ['spark', 'shark', 'ephemeral-hdfs', 'persistent-hdfs',
+ modules = ['spark', 'ephemeral-hdfs', 'persistent-hdfs',
'mapreduce', 'spark-standalone', 'tachyon']
if opts.hadoop_major_version == "1":
@@ -706,9 +719,7 @@ def wait_for_cluster_state(conn, opts, cluster_instances, cluster_state):
sys.stdout.flush()
start_time = datetime.now()
-
num_attempts = 0
- conn = ec2.connect_to_region(opts.region)
while True:
time.sleep(5 * num_attempts) # seconds
@@ -815,13 +826,11 @@ def deploy_files(conn, root_dir, opts, master_nodes, slave_nodes, modules):
cluster_url = "%s:7077" % active_master
if "." in opts.spark_version:
- # Pre-built spark & shark deploy
- (spark_v, shark_v) = get_spark_shark_version(opts)
+ # Pre-built Spark deploy
+ spark_v = get_validate_spark_version(opts.spark_version, opts.spark_git_repo)
else:
# Spark-only custom deploy
spark_v = "%s|%s" % (opts.spark_git_repo, opts.spark_version)
- shark_v = ""
- modules = filter(lambda x: x != "shark", modules)
template_vars = {
"master_list": '\n'.join([i.public_dns_name for i in master_nodes]),
@@ -834,7 +843,6 @@ def deploy_files(conn, root_dir, opts, master_nodes, slave_nodes, modules):
"swap": str(opts.swap),
"modules": '\n'.join(modules),
"spark_version": spark_v,
- "shark_version": shark_v,
"hadoop_major_version": opts.hadoop_major_version,
"spark_worker_instances": "%d" % opts.worker_instances,
"spark_master_opts": opts.master_opts
@@ -983,6 +991,8 @@ def real_main():
(opts, action, cluster_name) = parse_args()
# Input parameter validation
+ get_validate_spark_version(opts.spark_version, opts.spark_git_repo)
+
if opts.wait is not None:
# NOTE: DeprecationWarnings are silent in 2.7+ by default.
# To show them, run Python with the -Wdefault switch.