aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.gitignore1
-rwxr-xr-xdev/create-release/release-tag.sh3
-rwxr-xr-xdev/create-release/releaseutils.py1
-rwxr-xr-xdev/lint-python2
-rw-r--r--dev/sparktestsupport/modules.py9
-rwxr-xr-xdocs/_layouts/global.html2
-rw-r--r--docs/cluster-overview.md2
-rw-r--r--docs/ec2-scripts.md192
-rw-r--r--docs/index.md5
-rw-r--r--ec2/README4
-rw-r--r--ec2/deploy.generic/root/spark-ec2/ec2-variables.sh34
-rwxr-xr-xec2/spark-ec225
-rwxr-xr-xec2/spark_ec2.py1530
-rwxr-xr-xmake-distribution.sh1
14 files changed, 3 insertions, 1808 deletions
diff --git a/.gitignore b/.gitignore
index 07524bc429..8ecf536e79 100644
--- a/.gitignore
+++ b/.gitignore
@@ -60,7 +60,6 @@ dev/create-release/*final
spark-*-bin-*.tgz
unit-tests.log
/lib/
-ec2/lib/
rat-results.txt
scalastyle.txt
scalastyle-output.xml
diff --git a/dev/create-release/release-tag.sh b/dev/create-release/release-tag.sh
index b0a3374bec..d404939d1c 100755
--- a/dev/create-release/release-tag.sh
+++ b/dev/create-release/release-tag.sh
@@ -64,9 +64,6 @@ git commit -a -m "Preparing Spark release $RELEASE_TAG"
echo "Creating tag $RELEASE_TAG at the head of $GIT_BRANCH"
git tag $RELEASE_TAG
-# TODO: It would be nice to do some verifications here
-# i.e. check whether ec2 scripts have the new version
-
# Create next version
$MVN versions:set -DnewVersion=$NEXT_VERSION | grep -v "no value" # silence logs
git commit -a -m "Preparing development version $NEXT_VERSION"
diff --git a/dev/create-release/releaseutils.py b/dev/create-release/releaseutils.py
index 7f152b7f53..5d0ac16b3b 100755
--- a/dev/create-release/releaseutils.py
+++ b/dev/create-release/releaseutils.py
@@ -159,7 +159,6 @@ known_components = {
"build": CORE_COMPONENT,
"deploy": CORE_COMPONENT,
"documentation": CORE_COMPONENT,
- "ec2": "EC2",
"examples": CORE_COMPONENT,
"graphx": "GraphX",
"input/output": CORE_COMPONENT,
diff --git a/dev/lint-python b/dev/lint-python
index 0b97213ae3..1765a07d2f 100755
--- a/dev/lint-python
+++ b/dev/lint-python
@@ -19,7 +19,7 @@
SCRIPT_DIR="$( cd "$( dirname "$0" )" && pwd )"
SPARK_ROOT_DIR="$(dirname "$SCRIPT_DIR")"
-PATHS_TO_CHECK="./python/pyspark/ ./ec2/spark_ec2.py ./examples/src/main/python/ ./dev/sparktestsupport"
+PATHS_TO_CHECK="./python/pyspark/ ./examples/src/main/python/ ./dev/sparktestsupport"
PATHS_TO_CHECK="$PATHS_TO_CHECK ./dev/run-tests.py ./python/run-tests.py ./dev/run-tests-jenkins.py"
PEP8_REPORT_PATH="$SPARK_ROOT_DIR/dev/pep8-report.txt"
PYLINT_REPORT_PATH="$SPARK_ROOT_DIR/dev/pylint-report.txt"
diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py
index 47cd600bd1..1fc6596164 100644
--- a/dev/sparktestsupport/modules.py
+++ b/dev/sparktestsupport/modules.py
@@ -406,15 +406,6 @@ build = Module(
should_run_build_tests=True
)
-ec2 = Module(
- name="ec2",
- dependencies=[],
- source_file_regexes=[
- "ec2/",
- ]
-)
-
-
yarn = Module(
name="yarn",
dependencies=[],
diff --git a/docs/_layouts/global.html b/docs/_layouts/global.html
index 62d75eff71..d493f62f0e 100755
--- a/docs/_layouts/global.html
+++ b/docs/_layouts/global.html
@@ -98,8 +98,6 @@
<li><a href="spark-standalone.html">Spark Standalone</a></li>
<li><a href="running-on-mesos.html">Mesos</a></li>
<li><a href="running-on-yarn.html">YARN</a></li>
- <li class="divider"></li>
- <li><a href="ec2-scripts.html">Amazon EC2</a></li>
</ul>
</li>
diff --git a/docs/cluster-overview.md b/docs/cluster-overview.md
index faaf154d24..2810112f52 100644
--- a/docs/cluster-overview.md
+++ b/docs/cluster-overview.md
@@ -53,8 +53,6 @@ The system currently supports three cluster managers:
and service applications.
* [Hadoop YARN](running-on-yarn.html) -- the resource manager in Hadoop 2.
-In addition, Spark's [EC2 launch scripts](ec2-scripts.html) make it easy to launch a standalone
-cluster on Amazon EC2.
# Submitting Applications
diff --git a/docs/ec2-scripts.md b/docs/ec2-scripts.md
deleted file mode 100644
index 7f60f82b96..0000000000
--- a/docs/ec2-scripts.md
+++ /dev/null
@@ -1,192 +0,0 @@
----
-layout: global
-title: Running Spark on EC2
----
-
-The `spark-ec2` script, located in Spark's `ec2` directory, allows you
-to launch, manage and shut down Spark clusters on Amazon EC2. It automatically
-sets up Spark and HDFS on the cluster for you. This guide describes
-how to use `spark-ec2` to launch clusters, how to run jobs on them, and how
-to shut them down. It assumes you've already signed up for an EC2 account
-on the [Amazon Web Services site](http://aws.amazon.com/).
-
-`spark-ec2` is designed to manage multiple named clusters. You can
-launch a new cluster (telling the script its size and giving it a name),
-shutdown an existing cluster, or log into a cluster. Each cluster is
-identified by placing its machines into EC2 security groups whose names
-are derived from the name of the cluster. For example, a cluster named
-`test` will contain a master node in a security group called
-`test-master`, and a number of slave nodes in a security group called
-`test-slaves`. The `spark-ec2` script will create these security groups
-for you based on the cluster name you request. You can also use them to
-identify machines belonging to each cluster in the Amazon EC2 Console.
-
-
-# Before You Start
-
-- Create an Amazon EC2 key pair for yourself. This can be done by
- logging into your Amazon Web Services account through the [AWS
- console](http://aws.amazon.com/console/), clicking Key Pairs on the
- left sidebar, and creating and downloading a key. Make sure that you
- set the permissions for the private key file to `600` (i.e. only you
- can read and write it) so that `ssh` will work.
-- Whenever you want to use the `spark-ec2` script, set the environment
- variables `AWS_ACCESS_KEY_ID` and `AWS_SECRET_ACCESS_KEY` to your
- Amazon EC2 access key ID and secret access key. These can be
- obtained from the [AWS homepage](http://aws.amazon.com/) by clicking
- Account \> Security Credentials \> Access Credentials.
-
-# Launching a Cluster
-
-- Go into the `ec2` directory in the release of Spark you downloaded.
-- Run
- `./spark-ec2 -k <keypair> -i <key-file> -s <num-slaves> launch <cluster-name>`,
- where `<keypair>` is the name of your EC2 key pair (that you gave it
- when you created it), `<key-file>` is the private key file for your
- key pair, `<num-slaves>` is the number of slave nodes to launch (try
- 1 at first), and `<cluster-name>` is the name to give to your
- cluster.
-
- For example:
-
- ```bash
- export AWS_SECRET_ACCESS_KEY=AaBbCcDdEeFGgHhIiJjKkLlMmNnOoPpQqRrSsTtU
-export AWS_ACCESS_KEY_ID=ABCDEFG1234567890123
-./spark-ec2 --key-pair=awskey --identity-file=awskey.pem --region=us-west-1 --zone=us-west-1a launch my-spark-cluster
- ```
-
-- After everything launches, check that the cluster scheduler is up and sees
- all the slaves by going to its web UI, which will be printed at the end of
- the script (typically `http://<master-hostname>:8080`).
-
-You can also run `./spark-ec2 --help` to see more usage options. The
-following options are worth pointing out:
-
-- `--instance-type=<instance-type>` can be used to specify an EC2
-instance type to use. For now, the script only supports 64-bit instance
-types, and the default type is `m1.large` (which has 2 cores and 7.5 GB
-RAM). Refer to the Amazon pages about [EC2 instance
-types](http://aws.amazon.com/ec2/instance-types) and [EC2
-pricing](http://aws.amazon.com/ec2/#pricing) for information about other
-instance types.
-- `--region=<ec2-region>` specifies an EC2 region in which to launch
-instances. The default region is `us-east-1`.
-- `--zone=<ec2-zone>` can be used to specify an EC2 availability zone
-to launch instances in. Sometimes, you will get an error because there
-is not enough capacity in one zone, and you should try to launch in
-another.
-- `--ebs-vol-size=<GB>` will attach an EBS volume with a given amount
- of space to each node so that you can have a persistent HDFS cluster
- on your nodes across cluster restarts (see below).
-- `--spot-price=<price>` will launch the worker nodes as
- [Spot Instances](http://aws.amazon.com/ec2/spot-instances/),
- bidding for the given maximum price (in dollars).
-- `--spark-version=<version>` will pre-load the cluster with the
- specified version of Spark. The `<version>` can be a version number
- (e.g. "0.7.3") or a specific git hash. By default, a recent
- version will be used.
-- `--spark-git-repo=<repository url>` will let you run a custom version of
- Spark that is built from the given git repository. By default, the
- [Apache Github mirror](https://github.com/apache/spark) will be used.
- When using a custom Spark version, `--spark-version` must be set to git
- commit hash, such as 317e114, instead of a version number.
-- If one of your launches fails due to e.g. not having the right
-permissions on your private key file, you can run `launch` with the
-`--resume` option to restart the setup process on an existing cluster.
-
-# Launching a Cluster in a VPC
-
-- Run
- `./spark-ec2 -k <keypair> -i <key-file> -s <num-slaves> --vpc-id=<vpc-id> --subnet-id=<subnet-id> launch <cluster-name>`,
- where `<keypair>` is the name of your EC2 key pair (that you gave it
- when you created it), `<key-file>` is the private key file for your
- key pair, `<num-slaves>` is the number of slave nodes to launch (try
- 1 at first), `<vpc-id>` is the name of your VPC, `<subnet-id>` is the
- name of your subnet, and `<cluster-name>` is the name to give to your
- cluster.
-
- For example:
-
- ```bash
- export AWS_SECRET_ACCESS_KEY=AaBbCcDdEeFGgHhIiJjKkLlMmNnOoPpQqRrSsTtU
-export AWS_ACCESS_KEY_ID=ABCDEFG1234567890123
-./spark-ec2 --key-pair=awskey --identity-file=awskey.pem --region=us-west-1 --zone=us-west-1a --vpc-id=vpc-a28d24c7 --subnet-id=subnet-4eb27b39 --spark-version=1.1.0 launch my-spark-cluster
- ```
-
-# Running Applications
-
-- Go into the `ec2` directory in the release of Spark you downloaded.
-- Run `./spark-ec2 -k <keypair> -i <key-file> login <cluster-name>` to
- SSH into the cluster, where `<keypair>` and `<key-file>` are as
- above. (This is just for convenience; you could also use
- the EC2 console.)
-- To deploy code or data within your cluster, you can log in and use the
- provided script `~/spark-ec2/copy-dir`, which,
- given a directory path, RSYNCs it to the same location on all the slaves.
-- If your application needs to access large datasets, the fastest way to do
- that is to load them from Amazon S3 or an Amazon EBS device into an
- instance of the Hadoop Distributed File System (HDFS) on your nodes.
- The `spark-ec2` script already sets up a HDFS instance for you. It's
- installed in `/root/ephemeral-hdfs`, and can be accessed using the
- `bin/hadoop` script in that directory. Note that the data in this
- HDFS goes away when you stop and restart a machine.
-- There is also a *persistent HDFS* instance in
- `/root/persistent-hdfs` that will keep data across cluster restarts.
- Typically each node has relatively little space of persistent data
- (about 3 GB), but you can use the `--ebs-vol-size` option to
- `spark-ec2` to attach a persistent EBS volume to each node for
- storing the persistent HDFS.
-- Finally, if you get errors while running your application, look at the slave's logs
- for that application inside of the scheduler work directory (/root/spark/work). You can
- also view the status of the cluster using the web UI: `http://<master-hostname>:8080`.
-
-# Configuration
-
-You can edit `/root/spark/conf/spark-env.sh` on each machine to set Spark configuration options, such
-as JVM options. This file needs to be copied to **every machine** to reflect the change. The easiest way to
-do this is to use a script we provide called `copy-dir`. First edit your `spark-env.sh` file on the master,
-then run `~/spark-ec2/copy-dir /root/spark/conf` to RSYNC it to all the workers.
-
-The [configuration guide](configuration.html) describes the available configuration options.
-
-# Terminating a Cluster
-
-***Note that there is no way to recover data on EC2 nodes after shutting
-them down! Make sure you have copied everything important off the nodes
-before stopping them.***
-
-- Go into the `ec2` directory in the release of Spark you downloaded.
-- Run `./spark-ec2 destroy <cluster-name>`.
-
-# Pausing and Restarting Clusters
-
-The `spark-ec2` script also supports pausing a cluster. In this case,
-the VMs are stopped but not terminated, so they
-***lose all data on ephemeral disks*** but keep the data in their
-root partitions and their `persistent-hdfs`. Stopped machines will not
-cost you any EC2 cycles, but ***will*** continue to cost money for EBS
-storage.
-
-- To stop one of your clusters, go into the `ec2` directory and run
-`./spark-ec2 --region=<ec2-region> stop <cluster-name>`.
-- To restart it later, run
-`./spark-ec2 -i <key-file> --region=<ec2-region> start <cluster-name>`.
-- To ultimately destroy the cluster and stop consuming EBS space, run
-`./spark-ec2 --region=<ec2-region> destroy <cluster-name>` as described in the previous
-section.
-
-# Limitations
-
-- Support for "cluster compute" nodes is limited -- there's no way to specify a
- locality group. However, you can launch slave nodes in your
- `<clusterName>-slaves` group manually and then use `spark-ec2 launch
- --resume` to start a cluster with them.
-
-If you have a patch or suggestion for one of these limitations, feel free to
-[contribute](contributing-to-spark.html) it!
-
-# Accessing Data in S3
-
-Spark's file interface allows it to process data in Amazon S3 using the same URI formats that are supported for Hadoop. You can specify a path in S3 as input through a URI of the form `s3n://<bucket>/path`. To provide AWS credentials for S3 access, launch the Spark cluster with the option `--copy-aws-credentials`. Full instructions on S3 access using the Hadoop input libraries can be found on the [Hadoop S3 page](http://wiki.apache.org/hadoop/AmazonS3).
-
-In addition to using a single input file, you can also use a directory of files as input by simply giving the path to the directory.
diff --git a/docs/index.md b/docs/index.md
index ae26f97c86..9dfc52a2bd 100644
--- a/docs/index.md
+++ b/docs/index.md
@@ -64,7 +64,7 @@ To run Spark interactively in a R interpreter, use `bin/sparkR`:
./bin/sparkR --master local[2]
Example applications are also provided in R. For example,
-
+
./bin/spark-submit examples/src/main/r/dataframe.R
# Launching on a Cluster
@@ -73,7 +73,6 @@ The Spark [cluster mode overview](cluster-overview.html) explains the key concep
Spark can run both by itself, or over several existing cluster managers. It currently provides several
options for deployment:
-* [Amazon EC2](ec2-scripts.html): our EC2 scripts let you launch a cluster in about 5 minutes
* [Standalone Deploy Mode](spark-standalone.html): simplest way to deploy Spark on a private cluster
* [Apache Mesos](running-on-mesos.html)
* [Hadoop YARN](running-on-yarn.html)
@@ -103,7 +102,7 @@ options for deployment:
* [Cluster Overview](cluster-overview.html): overview of concepts and components when running on a cluster
* [Submitting Applications](submitting-applications.html): packaging and deploying applications
* Deployment modes:
- * [Amazon EC2](ec2-scripts.html): scripts that let you launch a cluster on EC2 in about 5 minutes
+ * [Amazon EC2](https://github.com/amplab/spark-ec2): scripts that let you launch a cluster on EC2 in about 5 minutes
* [Standalone Deploy Mode](spark-standalone.html): launch a standalone cluster quickly without a third-party cluster manager
* [Mesos](running-on-mesos.html): deploy a private cluster using
[Apache Mesos](http://mesos.apache.org)
diff --git a/ec2/README b/ec2/README
deleted file mode 100644
index 72434f24bf..0000000000
--- a/ec2/README
+++ /dev/null
@@ -1,4 +0,0 @@
-This folder contains a script, spark-ec2, for launching Spark clusters on
-Amazon EC2. Usage instructions are available online at:
-
-http://spark.apache.org/docs/latest/ec2-scripts.html
diff --git a/ec2/deploy.generic/root/spark-ec2/ec2-variables.sh b/ec2/deploy.generic/root/spark-ec2/ec2-variables.sh
deleted file mode 100644
index 4f3e8da809..0000000000
--- a/ec2/deploy.generic/root/spark-ec2/ec2-variables.sh
+++ /dev/null
@@ -1,34 +0,0 @@
-#!/usr/bin/env bash
-
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-# These variables are automatically filled in by the spark-ec2 script.
-export MASTERS="{{master_list}}"
-export SLAVES="{{slave_list}}"
-export HDFS_DATA_DIRS="{{hdfs_data_dirs}}"
-export MAPRED_LOCAL_DIRS="{{mapred_local_dirs}}"
-export SPARK_LOCAL_DIRS="{{spark_local_dirs}}"
-export MODULES="{{modules}}"
-export SPARK_VERSION="{{spark_version}}"
-export TACHYON_VERSION="{{tachyon_version}}"
-export HADOOP_MAJOR_VERSION="{{hadoop_major_version}}"
-export SWAP_MB="{{swap}}"
-export SPARK_WORKER_INSTANCES="{{spark_worker_instances}}"
-export SPARK_MASTER_OPTS="{{spark_master_opts}}"
-export AWS_ACCESS_KEY_ID="{{aws_access_key_id}}"
-export AWS_SECRET_ACCESS_KEY="{{aws_secret_access_key}}"
diff --git a/ec2/spark-ec2 b/ec2/spark-ec2
deleted file mode 100755
index 26e7d22655..0000000000
--- a/ec2/spark-ec2
+++ /dev/null
@@ -1,25 +0,0 @@
-#!/bin/sh
-
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-# Preserve the user's CWD so that relative paths are passed correctly to
-#+ the underlying Python script.
-SPARK_EC2_DIR="$(dirname "$0")"
-
-python -Wdefault "${SPARK_EC2_DIR}/spark_ec2.py" "$@"
diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py
deleted file mode 100755
index 19d5980560..0000000000
--- a/ec2/spark_ec2.py
+++ /dev/null
@@ -1,1530 +0,0 @@
-#!/usr/bin/env python
-# -*- coding: utf-8 -*-
-
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-from __future__ import division, print_function, with_statement
-
-import codecs
-import hashlib
-import itertools
-import logging
-import os
-import os.path
-import pipes
-import random
-import shutil
-import string
-from stat import S_IRUSR
-import subprocess
-import sys
-import tarfile
-import tempfile
-import textwrap
-import time
-import warnings
-from datetime import datetime
-from optparse import OptionParser
-from sys import stderr
-
-if sys.version < "3":
- from urllib2 import urlopen, Request, HTTPError
-else:
- from urllib.request import urlopen, Request
- from urllib.error import HTTPError
- raw_input = input
- xrange = range
-
-SPARK_EC2_VERSION = "1.6.0"
-SPARK_EC2_DIR = os.path.dirname(os.path.realpath(__file__))
-
-VALID_SPARK_VERSIONS = set([
- "0.7.3",
- "0.8.0",
- "0.8.1",
- "0.9.0",
- "0.9.1",
- "0.9.2",
- "1.0.0",
- "1.0.1",
- "1.0.2",
- "1.1.0",
- "1.1.1",
- "1.2.0",
- "1.2.1",
- "1.3.0",
- "1.3.1",
- "1.4.0",
- "1.4.1",
- "1.5.0",
- "1.5.1",
- "1.5.2",
- "1.6.0",
-])
-
-SPARK_TACHYON_MAP = {
- "1.0.0": "0.4.1",
- "1.0.1": "0.4.1",
- "1.0.2": "0.4.1",
- "1.1.0": "0.5.0",
- "1.1.1": "0.5.0",
- "1.2.0": "0.5.0",
- "1.2.1": "0.5.0",
- "1.3.0": "0.5.0",
- "1.3.1": "0.5.0",
- "1.4.0": "0.6.4",
- "1.4.1": "0.6.4",
- "1.5.0": "0.7.1",
- "1.5.1": "0.7.1",
- "1.5.2": "0.7.1",
- "1.6.0": "0.8.2",
-}
-
-DEFAULT_SPARK_VERSION = SPARK_EC2_VERSION
-DEFAULT_SPARK_GITHUB_REPO = "https://github.com/apache/spark"
-
-# Default location to get the spark-ec2 scripts (and ami-list) from
-DEFAULT_SPARK_EC2_GITHUB_REPO = "https://github.com/amplab/spark-ec2"
-DEFAULT_SPARK_EC2_BRANCH = "branch-1.5"
-
-
-def setup_external_libs(libs):
- """
- Download external libraries from PyPI to SPARK_EC2_DIR/lib/ and prepend them to our PATH.
- """
- PYPI_URL_PREFIX = "https://pypi.python.org/packages/source"
- SPARK_EC2_LIB_DIR = os.path.join(SPARK_EC2_DIR, "lib")
-
- if not os.path.exists(SPARK_EC2_LIB_DIR):
- print("Downloading external libraries that spark-ec2 needs from PyPI to {path}...".format(
- path=SPARK_EC2_LIB_DIR
- ))
- print("This should be a one-time operation.")
- os.mkdir(SPARK_EC2_LIB_DIR)
-
- for lib in libs:
- versioned_lib_name = "{n}-{v}".format(n=lib["name"], v=lib["version"])
- lib_dir = os.path.join(SPARK_EC2_LIB_DIR, versioned_lib_name)
-
- if not os.path.isdir(lib_dir):
- tgz_file_path = os.path.join(SPARK_EC2_LIB_DIR, versioned_lib_name + ".tar.gz")
- print(" - Downloading {lib}...".format(lib=lib["name"]))
- download_stream = urlopen(
- "{prefix}/{first_letter}/{lib_name}/{lib_name}-{lib_version}.tar.gz".format(
- prefix=PYPI_URL_PREFIX,
- first_letter=lib["name"][:1],
- lib_name=lib["name"],
- lib_version=lib["version"]
- )
- )
- with open(tgz_file_path, "wb") as tgz_file:
- tgz_file.write(download_stream.read())
- with open(tgz_file_path, "rb") as tar:
- if hashlib.md5(tar.read()).hexdigest() != lib["md5"]:
- print("ERROR: Got wrong md5sum for {lib}.".format(lib=lib["name"]), file=stderr)
- sys.exit(1)
- tar = tarfile.open(tgz_file_path)
- tar.extractall(path=SPARK_EC2_LIB_DIR)
- tar.close()
- os.remove(tgz_file_path)
- print(" - Finished downloading {lib}.".format(lib=lib["name"]))
- sys.path.insert(1, lib_dir)
-
-
-# Only PyPI libraries are supported.
-external_libs = [
- {
- "name": "boto",
- "version": "2.34.0",
- "md5": "5556223d2d0cc4d06dd4829e671dcecd"
- }
-]
-
-setup_external_libs(external_libs)
-
-import boto
-from boto.ec2.blockdevicemapping import BlockDeviceMapping, BlockDeviceType, EBSBlockDeviceType
-from boto import ec2
-
-
-class UsageError(Exception):
- pass
-
-
-# Configure and parse our command-line arguments
-def parse_args():
- parser = OptionParser(
- prog="spark-ec2",
- version="%prog {v}".format(v=SPARK_EC2_VERSION),
- usage="%prog [options] <action> <cluster_name>\n\n"
- + "<action> can be: launch, destroy, login, stop, start, get-master, reboot-slaves")
-
- parser.add_option(
- "-s", "--slaves", type="int", default=1,
- help="Number of slaves to launch (default: %default)")
- parser.add_option(
- "-w", "--wait", type="int",
- help="DEPRECATED (no longer necessary) - Seconds to wait for nodes to start")
- parser.add_option(
- "-k", "--key-pair",
- help="Key pair to use on instances")
- parser.add_option(
- "-i", "--identity-file",
- help="SSH private key file to use for logging into instances")
- parser.add_option(
- "-p", "--profile", default=None,
- help="If you have multiple profiles (AWS or boto config), you can configure " +
- "additional, named profiles by using this option (default: %default)")
- parser.add_option(
- "-t", "--instance-type", default="m1.large",
- help="Type of instance to launch (default: %default). " +
- "WARNING: must be 64-bit; small instances won't work")
- parser.add_option(
- "-m", "--master-instance-type", default="",
- help="Master instance type (leave empty for same as instance-type)")
- parser.add_option(
- "-r", "--region", default="us-east-1",
- help="EC2 region used to launch instances in, or to find them in (default: %default)")
- parser.add_option(
- "-z", "--zone", default="",
- help="Availability zone to launch instances in, or 'all' to spread " +
- "slaves across multiple (an additional $0.01/Gb for bandwidth" +
- "between zones applies) (default: a single zone chosen at random)")
- parser.add_option(
- "-a", "--ami",
- help="Amazon Machine Image ID to use")
- parser.add_option(
- "-v", "--spark-version", default=DEFAULT_SPARK_VERSION,
- help="Version of Spark to use: 'X.Y.Z' or a specific git hash (default: %default)")
- parser.add_option(
- "--spark-git-repo",
- default=DEFAULT_SPARK_GITHUB_REPO,
- help="Github repo from which to checkout supplied commit hash (default: %default)")
- parser.add_option(
- "--spark-ec2-git-repo",
- default=DEFAULT_SPARK_EC2_GITHUB_REPO,
- help="Github repo from which to checkout spark-ec2 (default: %default)")
- parser.add_option(
- "--spark-ec2-git-branch",
- default=DEFAULT_SPARK_EC2_BRANCH,
- help="Github repo branch of spark-ec2 to use (default: %default)")
- parser.add_option(
- "--deploy-root-dir",
- default=None,
- help="A directory to copy into / on the first master. " +
- "Must be absolute. Note that a trailing slash is handled as per rsync: " +
- "If you omit it, the last directory of the --deploy-root-dir path will be created " +
- "in / before copying its contents. If you append the trailing slash, " +
- "the directory is not created and its contents are copied directly into /. " +
- "(default: %default).")
- parser.add_option(
- "--hadoop-major-version", default="1",
- help="Major version of Hadoop. Valid options are 1 (Hadoop 1.0.4), 2 (CDH 4.2.0), yarn " +
- "(Hadoop 2.4.0) (default: %default)")
- parser.add_option(
- "-D", metavar="[ADDRESS:]PORT", dest="proxy_port",
- help="Use SSH dynamic port forwarding to create a SOCKS proxy at " +
- "the given local address (for use with login)")
- parser.add_option(
- "--resume", action="store_true", default=False,
- help="Resume installation on a previously launched cluster " +
- "(for debugging)")
- parser.add_option(
- "--ebs-vol-size", metavar="SIZE", type="int", default=0,
- help="Size (in GB) of each EBS volume.")
- parser.add_option(
- "--ebs-vol-type", default="standard",
- help="EBS volume type (e.g. 'gp2', 'standard').")
- parser.add_option(
- "--ebs-vol-num", type="int", default=1,
- help="Number of EBS volumes to attach to each node as /vol[x]. " +
- "The volumes will be deleted when the instances terminate. " +
- "Only possible on EBS-backed AMIs. " +
- "EBS volumes are only attached if --ebs-vol-size > 0. " +
- "Only support up to 8 EBS volumes.")
- parser.add_option(
- "--placement-group", type="string", default=None,
- help="Which placement group to try and launch " +
- "instances into. Assumes placement group is already " +
- "created.")
- parser.add_option(
- "--swap", metavar="SWAP", type="int", default=1024,
- help="Swap space to set up per node, in MB (default: %default)")
- parser.add_option(
- "--spot-price", metavar="PRICE", type="float",
- help="If specified, launch slaves as spot instances with the given " +
- "maximum price (in dollars)")
- parser.add_option(
- "--ganglia", action="store_true", default=True,
- help="Setup Ganglia monitoring on cluster (default: %default). NOTE: " +
- "the Ganglia page will be publicly accessible")
- parser.add_option(
- "--no-ganglia", action="store_false", dest="ganglia",
- help="Disable Ganglia monitoring for the cluster")
- parser.add_option(
- "-u", "--user", default="root",
- help="The SSH user you want to connect as (default: %default)")
- parser.add_option(
- "--delete-groups", action="store_true", default=False,
- help="When destroying a cluster, delete the security groups that were created")
- parser.add_option(
- "--use-existing-master", action="store_true", default=False,
- help="Launch fresh slaves, but use an existing stopped master if possible")
- parser.add_option(
- "--worker-instances", type="int", default=1,
- help="Number of instances per worker: variable SPARK_WORKER_INSTANCES. Not used if YARN " +
- "is used as Hadoop major version (default: %default)")
- parser.add_option(
- "--master-opts", type="string", default="",
- help="Extra options to give to master through SPARK_MASTER_OPTS variable " +
- "(e.g -Dspark.worker.timeout=180)")
- parser.add_option(
- "--user-data", type="string", default="",
- help="Path to a user-data file (most AMIs interpret this as an initialization script)")
- parser.add_option(
- "--authorized-address", type="string", default="0.0.0.0/0",
- help="Address to authorize on created security groups (default: %default)")
- parser.add_option(
- "--additional-security-group", type="string", default="",
- help="Additional security group to place the machines in")
- parser.add_option(
- "--additional-tags", type="string", default="",
- help="Additional tags to set on the machines; tags are comma-separated, while name and " +
- "value are colon separated; ex: \"Task:MySparkProject,Env:production\"")
- parser.add_option(
- "--copy-aws-credentials", action="store_true", default=False,
- help="Add AWS credentials to hadoop configuration to allow Spark to access S3")
- parser.add_option(
- "--subnet-id", default=None,
- help="VPC subnet to launch instances in")
- parser.add_option(
- "--vpc-id", default=None,
- help="VPC to launch instances in")
- parser.add_option(
- "--private-ips", action="store_true", default=False,
- help="Use private IPs for instances rather than public if VPC/subnet " +
- "requires that.")
- parser.add_option(
- "--instance-initiated-shutdown-behavior", default="stop",
- choices=["stop", "terminate"],
- help="Whether instances should terminate when shut down or just stop")
- parser.add_option(
- "--instance-profile-name", default=None,
- help="IAM profile name to launch instances under")
-
- (opts, args) = parser.parse_args()
- if len(args) != 2:
- parser.print_help()
- sys.exit(1)
- (action, cluster_name) = args
-
- # Boto config check
- # http://boto.cloudhackers.com/en/latest/boto_config_tut.html
- home_dir = os.getenv('HOME')
- if home_dir is None or not os.path.isfile(home_dir + '/.boto'):
- if not os.path.isfile('/etc/boto.cfg'):
- # If there is no boto config, check aws credentials
- if not os.path.isfile(home_dir + '/.aws/credentials'):
- if os.getenv('AWS_ACCESS_KEY_ID') is None:
- print("ERROR: The environment variable AWS_ACCESS_KEY_ID must be set",
- file=stderr)
- sys.exit(1)
- if os.getenv('AWS_SECRET_ACCESS_KEY') is None:
- print("ERROR: The environment variable AWS_SECRET_ACCESS_KEY must be set",
- file=stderr)
- sys.exit(1)
- return (opts, action, cluster_name)
-
-
-# Get the EC2 security group of the given name, creating it if it doesn't exist
-def get_or_make_group(conn, name, vpc_id):
- groups = conn.get_all_security_groups()
- group = [g for g in groups if g.name == name]
- if len(group) > 0:
- return group[0]
- else:
- print("Creating security group " + name)
- return conn.create_security_group(name, "Spark EC2 group", vpc_id)
-
-
-def get_validate_spark_version(version, repo):
- if "." in version:
- version = version.replace("v", "")
- if version not in VALID_SPARK_VERSIONS:
- print("Don't know about Spark version: {v}".format(v=version), file=stderr)
- sys.exit(1)
- return version
- else:
- github_commit_url = "{repo}/commit/{commit_hash}".format(repo=repo, commit_hash=version)
- request = Request(github_commit_url)
- request.get_method = lambda: 'HEAD'
- try:
- response = urlopen(request)
- except HTTPError as e:
- print("Couldn't validate Spark commit: {url}".format(url=github_commit_url),
- file=stderr)
- print("Received HTTP response code of {code}.".format(code=e.code), file=stderr)
- sys.exit(1)
- return version
-
-
-# Source: http://aws.amazon.com/amazon-linux-ami/instance-type-matrix/
-# Last Updated: 2015-06-19
-# For easy maintainability, please keep this manually-inputted dictionary sorted by key.
-EC2_INSTANCE_TYPES = {
- "c1.medium": "pvm",
- "c1.xlarge": "pvm",
- "c3.large": "pvm",
- "c3.xlarge": "pvm",
- "c3.2xlarge": "pvm",
- "c3.4xlarge": "pvm",
- "c3.8xlarge": "pvm",
- "c4.large": "hvm",
- "c4.xlarge": "hvm",
- "c4.2xlarge": "hvm",
- "c4.4xlarge": "hvm",
- "c4.8xlarge": "hvm",
- "cc1.4xlarge": "hvm",
- "cc2.8xlarge": "hvm",
- "cg1.4xlarge": "hvm",
- "cr1.8xlarge": "hvm",
- "d2.xlarge": "hvm",
- "d2.2xlarge": "hvm",
- "d2.4xlarge": "hvm",
- "d2.8xlarge": "hvm",
- "g2.2xlarge": "hvm",
- "g2.8xlarge": "hvm",
- "hi1.4xlarge": "pvm",
- "hs1.8xlarge": "pvm",
- "i2.xlarge": "hvm",
- "i2.2xlarge": "hvm",
- "i2.4xlarge": "hvm",
- "i2.8xlarge": "hvm",
- "m1.small": "pvm",
- "m1.medium": "pvm",
- "m1.large": "pvm",
- "m1.xlarge": "pvm",
- "m2.xlarge": "pvm",
- "m2.2xlarge": "pvm",
- "m2.4xlarge": "pvm",
- "m3.medium": "hvm",
- "m3.large": "hvm",
- "m3.xlarge": "hvm",
- "m3.2xlarge": "hvm",
- "m4.large": "hvm",
- "m4.xlarge": "hvm",
- "m4.2xlarge": "hvm",
- "m4.4xlarge": "hvm",
- "m4.10xlarge": "hvm",
- "r3.large": "hvm",
- "r3.xlarge": "hvm",
- "r3.2xlarge": "hvm",
- "r3.4xlarge": "hvm",
- "r3.8xlarge": "hvm",
- "t1.micro": "pvm",
- "t2.micro": "hvm",
- "t2.small": "hvm",
- "t2.medium": "hvm",
- "t2.large": "hvm",
-}
-
-
-def get_tachyon_version(spark_version):
- return SPARK_TACHYON_MAP.get(spark_version, "")
-
-
-# Attempt to resolve an appropriate AMI given the architecture and region of the request.
-def get_spark_ami(opts):
- if opts.instance_type in EC2_INSTANCE_TYPES:
- instance_type = EC2_INSTANCE_TYPES[opts.instance_type]
- else:
- instance_type = "pvm"
- print("Don't recognize %s, assuming type is pvm" % opts.instance_type, file=stderr)
-
- # URL prefix from which to fetch AMI information
- ami_prefix = "{r}/{b}/ami-list".format(
- r=opts.spark_ec2_git_repo.replace("https://github.com", "https://raw.github.com", 1),
- b=opts.spark_ec2_git_branch)
-
- ami_path = "%s/%s/%s" % (ami_prefix, opts.region, instance_type)
- reader = codecs.getreader("ascii")
- try:
- ami = reader(urlopen(ami_path)).read().strip()
- except:
- print("Could not resolve AMI at: " + ami_path, file=stderr)
- sys.exit(1)
-
- print("Spark AMI: " + ami)
- return ami
-
-
-# Launch a cluster of the given name, by setting up its security groups,
-# and then starting new instances in them.
-# Returns a tuple of EC2 reservation objects for the master and slaves
-# Fails if there already instances running in the cluster's groups.
-def launch_cluster(conn, opts, cluster_name):
- if opts.identity_file is None:
- print("ERROR: Must provide an identity file (-i) for ssh connections.", file=stderr)
- sys.exit(1)
-
- if opts.key_pair is None:
- print("ERROR: Must provide a key pair name (-k) to use on instances.", file=stderr)
- sys.exit(1)
-
- user_data_content = None
- if opts.user_data:
- with open(opts.user_data) as user_data_file:
- user_data_content = user_data_file.read()
-
- print("Setting up security groups...")
- master_group = get_or_make_group(conn, cluster_name + "-master", opts.vpc_id)
- slave_group = get_or_make_group(conn, cluster_name + "-slaves", opts.vpc_id)
- authorized_address = opts.authorized_address
- if master_group.rules == []: # Group was just now created
- if opts.vpc_id is None:
- master_group.authorize(src_group=master_group)
- master_group.authorize(src_group=slave_group)
- else:
- master_group.authorize(ip_protocol='icmp', from_port=-1, to_port=-1,
- src_group=master_group)
- master_group.authorize(ip_protocol='tcp', from_port=0, to_port=65535,
- src_group=master_group)
- master_group.authorize(ip_protocol='udp', from_port=0, to_port=65535,
- src_group=master_group)
- master_group.authorize(ip_protocol='icmp', from_port=-1, to_port=-1,
- src_group=slave_group)
- master_group.authorize(ip_protocol='tcp', from_port=0, to_port=65535,
- src_group=slave_group)
- master_group.authorize(ip_protocol='udp', from_port=0, to_port=65535,
- src_group=slave_group)
- master_group.authorize('tcp', 22, 22, authorized_address)
- master_group.authorize('tcp', 8080, 8081, authorized_address)
- master_group.authorize('tcp', 18080, 18080, authorized_address)
- master_group.authorize('tcp', 19999, 19999, authorized_address)
- master_group.authorize('tcp', 50030, 50030, authorized_address)
- master_group.authorize('tcp', 50070, 50070, authorized_address)
- master_group.authorize('tcp', 60070, 60070, authorized_address)
- master_group.authorize('tcp', 4040, 4045, authorized_address)
- # Rstudio (GUI for R) needs port 8787 for web access
- master_group.authorize('tcp', 8787, 8787, authorized_address)
- # HDFS NFS gateway requires 111,2049,4242 for tcp & udp
- master_group.authorize('tcp', 111, 111, authorized_address)
- master_group.authorize('udp', 111, 111, authorized_address)
- master_group.authorize('tcp', 2049, 2049, authorized_address)
- master_group.authorize('udp', 2049, 2049, authorized_address)
- master_group.authorize('tcp', 4242, 4242, authorized_address)
- master_group.authorize('udp', 4242, 4242, authorized_address)
- # RM in YARN mode uses 8088
- master_group.authorize('tcp', 8088, 8088, authorized_address)
- if opts.ganglia:
- master_group.authorize('tcp', 5080, 5080, authorized_address)
- if slave_group.rules == []: # Group was just now created
- if opts.vpc_id is None:
- slave_group.authorize(src_group=master_group)
- slave_group.authorize(src_group=slave_group)
- else:
- slave_group.authorize(ip_protocol='icmp', from_port=-1, to_port=-1,
- src_group=master_group)
- slave_group.authorize(ip_protocol='tcp', from_port=0, to_port=65535,
- src_group=master_group)
- slave_group.authorize(ip_protocol='udp', from_port=0, to_port=65535,
- src_group=master_group)
- slave_group.authorize(ip_protocol='icmp', from_port=-1, to_port=-1,
- src_group=slave_group)
- slave_group.authorize(ip_protocol='tcp', from_port=0, to_port=65535,
- src_group=slave_group)
- slave_group.authorize(ip_protocol='udp', from_port=0, to_port=65535,
- src_group=slave_group)
- slave_group.authorize('tcp', 22, 22, authorized_address)
- slave_group.authorize('tcp', 8080, 8081, authorized_address)
- slave_group.authorize('tcp', 50060, 50060, authorized_address)
- slave_group.authorize('tcp', 50075, 50075, authorized_address)
- slave_group.authorize('tcp', 60060, 60060, authorized_address)
- slave_group.authorize('tcp', 60075, 60075, authorized_address)
-
- # Check if instances are already running in our groups
- existing_masters, existing_slaves = get_existing_cluster(conn, opts, cluster_name,
- die_on_error=False)
- if existing_slaves or (existing_masters and not opts.use_existing_master):
- print("ERROR: There are already instances running in group %s or %s" %
- (master_group.name, slave_group.name), file=stderr)
- sys.exit(1)
-
- # Figure out Spark AMI
- if opts.ami is None:
- opts.ami = get_spark_ami(opts)
-
- # we use group ids to work around https://github.com/boto/boto/issues/350
- additional_group_ids = []
- if opts.additional_security_group:
- additional_group_ids = [sg.id
- for sg in conn.get_all_security_groups()
- if opts.additional_security_group in (sg.name, sg.id)]
- print("Launching instances...")
-
- try:
- image = conn.get_all_images(image_ids=[opts.ami])[0]
- except:
- print("Could not find AMI " + opts.ami, file=stderr)
- sys.exit(1)
-
- # Create block device mapping so that we can add EBS volumes if asked to.
- # The first drive is attached as /dev/sds, 2nd as /dev/sdt, ... /dev/sdz
- block_map = BlockDeviceMapping()
- if opts.ebs_vol_size > 0:
- for i in range(opts.ebs_vol_num):
- device = EBSBlockDeviceType()
- device.size = opts.ebs_vol_size
- device.volume_type = opts.ebs_vol_type
- device.delete_on_termination = True
- block_map["/dev/sd" + chr(ord('s') + i)] = device
-
- # AWS ignores the AMI-specified block device mapping for M3 (see SPARK-3342).
- if opts.instance_type.startswith('m3.'):
- for i in range(get_num_disks(opts.instance_type)):
- dev = BlockDeviceType()
- dev.ephemeral_name = 'ephemeral%d' % i
- # The first ephemeral drive is /dev/sdb.
- name = '/dev/sd' + string.ascii_letters[i + 1]
- block_map[name] = dev
-
- # Launch slaves
- if opts.spot_price is not None:
- # Launch spot instances with the requested price
- print("Requesting %d slaves as spot instances with price $%.3f" %
- (opts.slaves, opts.spot_price))
- zones = get_zones(conn, opts)
- num_zones = len(zones)
- i = 0
- my_req_ids = []
- for zone in zones:
- num_slaves_this_zone = get_partition(opts.slaves, num_zones, i)
- slave_reqs = conn.request_spot_instances(
- price=opts.spot_price,
- image_id=opts.ami,
- launch_group="launch-group-%s" % cluster_name,
- placement=zone,
- count=num_slaves_this_zone,
- key_name=opts.key_pair,
- security_group_ids=[slave_group.id] + additional_group_ids,
- instance_type=opts.instance_type,
- block_device_map=block_map,
- subnet_id=opts.subnet_id,
- placement_group=opts.placement_group,
- user_data=user_data_content,
- instance_profile_name=opts.instance_profile_name)
- my_req_ids += [req.id for req in slave_reqs]
- i += 1
-
- print("Waiting for spot instances to be granted...")
- try:
- while True:
- time.sleep(10)
- reqs = conn.get_all_spot_instance_requests()
- id_to_req = {}
- for r in reqs:
- id_to_req[r.id] = r
- active_instance_ids = []
- for i in my_req_ids:
- if i in id_to_req and id_to_req[i].state == "active":
- active_instance_ids.append(id_to_req[i].instance_id)
- if len(active_instance_ids) == opts.slaves:
- print("All %d slaves granted" % opts.slaves)
- reservations = conn.get_all_reservations(active_instance_ids)
- slave_nodes = []
- for r in reservations:
- slave_nodes += r.instances
- break
- else:
- print("%d of %d slaves granted, waiting longer" % (
- len(active_instance_ids), opts.slaves))
- except:
- print("Canceling spot instance requests")
- conn.cancel_spot_instance_requests(my_req_ids)
- # Log a warning if any of these requests actually launched instances:
- (master_nodes, slave_nodes) = get_existing_cluster(
- conn, opts, cluster_name, die_on_error=False)
- running = len(master_nodes) + len(slave_nodes)
- if running:
- print(("WARNING: %d instances are still running" % running), file=stderr)
- sys.exit(0)
- else:
- # Launch non-spot instances
- zones = get_zones(conn, opts)
- num_zones = len(zones)
- i = 0
- slave_nodes = []
- for zone in zones:
- num_slaves_this_zone = get_partition(opts.slaves, num_zones, i)
- if num_slaves_this_zone > 0:
- slave_res = image.run(
- key_name=opts.key_pair,
- security_group_ids=[slave_group.id] + additional_group_ids,
- instance_type=opts.instance_type,
- placement=zone,
- min_count=num_slaves_this_zone,
- max_count=num_slaves_this_zone,
- block_device_map=block_map,
- subnet_id=opts.subnet_id,
- placement_group=opts.placement_group,
- user_data=user_data_content,
- instance_initiated_shutdown_behavior=opts.instance_initiated_shutdown_behavior,
- instance_profile_name=opts.instance_profile_name)
- slave_nodes += slave_res.instances
- print("Launched {s} slave{plural_s} in {z}, regid = {r}".format(
- s=num_slaves_this_zone,
- plural_s=('' if num_slaves_this_zone == 1 else 's'),
- z=zone,
- r=slave_res.id))
- i += 1
-
- # Launch or resume masters
- if existing_masters:
- print("Starting master...")
- for inst in existing_masters:
- if inst.state not in ["shutting-down", "terminated"]:
- inst.start()
- master_nodes = existing_masters
- else:
- master_type = opts.master_instance_type
- if master_type == "":
- master_type = opts.instance_type
- if opts.zone == 'all':
- opts.zone = random.choice(conn.get_all_zones()).name
- master_res = image.run(
- key_name=opts.key_pair,
- security_group_ids=[master_group.id] + additional_group_ids,
- instance_type=master_type,
- placement=opts.zone,
- min_count=1,
- max_count=1,
- block_device_map=block_map,
- subnet_id=opts.subnet_id,
- placement_group=opts.placement_group,
- user_data=user_data_content,
- instance_initiated_shutdown_behavior=opts.instance_initiated_shutdown_behavior,
- instance_profile_name=opts.instance_profile_name)
-
- master_nodes = master_res.instances
- print("Launched master in %s, regid = %s" % (zone, master_res.id))
-
- # This wait time corresponds to SPARK-4983
- print("Waiting for AWS to propagate instance metadata...")
- time.sleep(15)
-
- # Give the instances descriptive names and set additional tags
- additional_tags = {}
- if opts.additional_tags.strip():
- additional_tags = dict(
- map(str.strip, tag.split(':', 1)) for tag in opts.additional_tags.split(',')
- )
-
- for master in master_nodes:
- master.add_tags(
- dict(additional_tags, Name='{cn}-master-{iid}'.format(cn=cluster_name, iid=master.id))
- )
-
- for slave in slave_nodes:
- slave.add_tags(
- dict(additional_tags, Name='{cn}-slave-{iid}'.format(cn=cluster_name, iid=slave.id))
- )
-
- # Return all the instances
- return (master_nodes, slave_nodes)
-
-
-def get_existing_cluster(conn, opts, cluster_name, die_on_error=True):
- """
- Get the EC2 instances in an existing cluster if available.
- Returns a tuple of lists of EC2 instance objects for the masters and slaves.
- """
- print("Searching for existing cluster {c} in region {r}...".format(
- c=cluster_name, r=opts.region))
-
- def get_instances(group_names):
- """
- Get all non-terminated instances that belong to any of the provided security groups.
-
- EC2 reservation filters and instance states are documented here:
- http://docs.aws.amazon.com/cli/latest/reference/ec2/describe-instances.html#options
- """
- reservations = conn.get_all_reservations(
- filters={"instance.group-name": group_names})
- instances = itertools.chain.from_iterable(r.instances for r in reservations)
- return [i for i in instances if i.state not in ["shutting-down", "terminated"]]
-
- master_instances = get_instances([cluster_name + "-master"])
- slave_instances = get_instances([cluster_name + "-slaves"])
-
- if any((master_instances, slave_instances)):
- print("Found {m} master{plural_m}, {s} slave{plural_s}.".format(
- m=len(master_instances),
- plural_m=('' if len(master_instances) == 1 else 's'),
- s=len(slave_instances),
- plural_s=('' if len(slave_instances) == 1 else 's')))
-
- if not master_instances and die_on_error:
- print("ERROR: Could not find a master for cluster {c} in region {r}.".format(
- c=cluster_name, r=opts.region), file=sys.stderr)
- sys.exit(1)
-
- return (master_instances, slave_instances)
-
-
-# Deploy configuration files and run setup scripts on a newly launched
-# or started EC2 cluster.
-def setup_cluster(conn, master_nodes, slave_nodes, opts, deploy_ssh_key):
- master = get_dns_name(master_nodes[0], opts.private_ips)
- if deploy_ssh_key:
- print("Generating cluster's SSH key on master...")
- key_setup = """
- [ -f ~/.ssh/id_rsa ] ||
- (ssh-keygen -q -t rsa -N '' -f ~/.ssh/id_rsa &&
- cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys)
- """
- ssh(master, opts, key_setup)
- dot_ssh_tar = ssh_read(master, opts, ['tar', 'c', '.ssh'])
- print("Transferring cluster's SSH key to slaves...")
- for slave in slave_nodes:
- slave_address = get_dns_name(slave, opts.private_ips)
- print(slave_address)
- ssh_write(slave_address, opts, ['tar', 'x'], dot_ssh_tar)
-
- modules = ['spark', 'ephemeral-hdfs', 'persistent-hdfs',
- 'mapreduce', 'spark-standalone', 'tachyon', 'rstudio']
-
- if opts.hadoop_major_version == "1":
- modules = list(filter(lambda x: x != "mapreduce", modules))
-
- if opts.ganglia:
- modules.append('ganglia')
-
- # Clear SPARK_WORKER_INSTANCES if running on YARN
- if opts.hadoop_major_version == "yarn":
- opts.worker_instances = ""
-
- # NOTE: We should clone the repository before running deploy_files to
- # prevent ec2-variables.sh from being overwritten
- print("Cloning spark-ec2 scripts from {r}/tree/{b} on master...".format(
- r=opts.spark_ec2_git_repo, b=opts.spark_ec2_git_branch))
- ssh(
- host=master,
- opts=opts,
- command="rm -rf spark-ec2"
- + " && "
- + "git clone {r} -b {b} spark-ec2".format(r=opts.spark_ec2_git_repo,
- b=opts.spark_ec2_git_branch)
- )
-
- print("Deploying files to master...")
- deploy_files(
- conn=conn,
- root_dir=SPARK_EC2_DIR + "/" + "deploy.generic",
- opts=opts,
- master_nodes=master_nodes,
- slave_nodes=slave_nodes,
- modules=modules
- )
-
- if opts.deploy_root_dir is not None:
- print("Deploying {s} to master...".format(s=opts.deploy_root_dir))
- deploy_user_files(
- root_dir=opts.deploy_root_dir,
- opts=opts,
- master_nodes=master_nodes
- )
-
- print("Running setup on master...")
- setup_spark_cluster(master, opts)
- print("Done!")
-
-
-def setup_spark_cluster(master, opts):
- ssh(master, opts, "chmod u+x spark-ec2/setup.sh")
- ssh(master, opts, "spark-ec2/setup.sh")
- print("Spark standalone cluster started at http://%s:8080" % master)
-
- if opts.ganglia:
- print("Ganglia started at http://%s:5080/ganglia" % master)
-
-
-def is_ssh_available(host, opts, print_ssh_output=True):
- """
- Check if SSH is available on a host.
- """
- s = subprocess.Popen(
- ssh_command(opts) + ['-t', '-t', '-o', 'ConnectTimeout=3',
- '%s@%s' % (opts.user, host), stringify_command('true')],
- stdout=subprocess.PIPE,
- stderr=subprocess.STDOUT # we pipe stderr through stdout to preserve output order
- )
- cmd_output = s.communicate()[0] # [1] is stderr, which we redirected to stdout
-
- if s.returncode != 0 and print_ssh_output:
- # extra leading newline is for spacing in wait_for_cluster_state()
- print(textwrap.dedent("""\n
- Warning: SSH connection error. (This could be temporary.)
- Host: {h}
- SSH return code: {r}
- SSH output: {o}
- """).format(
- h=host,
- r=s.returncode,
- o=cmd_output.strip()
- ))
-
- return s.returncode == 0
-
-
-def is_cluster_ssh_available(cluster_instances, opts):
- """
- Check if SSH is available on all the instances in a cluster.
- """
- for i in cluster_instances:
- dns_name = get_dns_name(i, opts.private_ips)
- if not is_ssh_available(host=dns_name, opts=opts):
- return False
- else:
- return True
-
-
-def wait_for_cluster_state(conn, opts, cluster_instances, cluster_state):
- """
- Wait for all the instances in the cluster to reach a designated state.
-
- cluster_instances: a list of boto.ec2.instance.Instance
- cluster_state: a string representing the desired state of all the instances in the cluster
- value can be 'ssh-ready' or a valid value from boto.ec2.instance.InstanceState such as
- 'running', 'terminated', etc.
- (would be nice to replace this with a proper enum: http://stackoverflow.com/a/1695250)
- """
- sys.stdout.write(
- "Waiting for cluster to enter '{s}' state.".format(s=cluster_state)
- )
- sys.stdout.flush()
-
- start_time = datetime.now()
- num_attempts = 0
-
- while True:
- time.sleep(5 * num_attempts) # seconds
-
- for i in cluster_instances:
- i.update()
-
- max_batch = 100
- statuses = []
- for j in xrange(0, len(cluster_instances), max_batch):
- batch = [i.id for i in cluster_instances[j:j + max_batch]]
- statuses.extend(conn.get_all_instance_status(instance_ids=batch))
-
- if cluster_state == 'ssh-ready':
- if all(i.state == 'running' for i in cluster_instances) and \
- all(s.system_status.status == 'ok' for s in statuses) and \
- all(s.instance_status.status == 'ok' for s in statuses) and \
- is_cluster_ssh_available(cluster_instances, opts):
- break
- else:
- if all(i.state == cluster_state for i in cluster_instances):
- break
-
- num_attempts += 1
-
- sys.stdout.write(".")
- sys.stdout.flush()
-
- sys.stdout.write("\n")
-
- end_time = datetime.now()
- print("Cluster is now in '{s}' state. Waited {t} seconds.".format(
- s=cluster_state,
- t=(end_time - start_time).seconds
- ))
-
-
-# Get number of local disks available for a given EC2 instance type.
-def get_num_disks(instance_type):
- # Source: http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/InstanceStorage.html
- # Last Updated: 2015-06-19
- # For easy maintainability, please keep this manually-inputted dictionary sorted by key.
- disks_by_instance = {
- "c1.medium": 1,
- "c1.xlarge": 4,
- "c3.large": 2,
- "c3.xlarge": 2,
- "c3.2xlarge": 2,
- "c3.4xlarge": 2,
- "c3.8xlarge": 2,
- "c4.large": 0,
- "c4.xlarge": 0,
- "c4.2xlarge": 0,
- "c4.4xlarge": 0,
- "c4.8xlarge": 0,
- "cc1.4xlarge": 2,
- "cc2.8xlarge": 4,
- "cg1.4xlarge": 2,
- "cr1.8xlarge": 2,
- "d2.xlarge": 3,
- "d2.2xlarge": 6,
- "d2.4xlarge": 12,
- "d2.8xlarge": 24,
- "g2.2xlarge": 1,
- "g2.8xlarge": 2,
- "hi1.4xlarge": 2,
- "hs1.8xlarge": 24,
- "i2.xlarge": 1,
- "i2.2xlarge": 2,
- "i2.4xlarge": 4,
- "i2.8xlarge": 8,
- "m1.small": 1,
- "m1.medium": 1,
- "m1.large": 2,
- "m1.xlarge": 4,
- "m2.xlarge": 1,
- "m2.2xlarge": 1,
- "m2.4xlarge": 2,
- "m3.medium": 1,
- "m3.large": 1,
- "m3.xlarge": 2,
- "m3.2xlarge": 2,
- "m4.large": 0,
- "m4.xlarge": 0,
- "m4.2xlarge": 0,
- "m4.4xlarge": 0,
- "m4.10xlarge": 0,
- "r3.large": 1,
- "r3.xlarge": 1,
- "r3.2xlarge": 1,
- "r3.4xlarge": 1,
- "r3.8xlarge": 2,
- "t1.micro": 0,
- "t2.micro": 0,
- "t2.small": 0,
- "t2.medium": 0,
- "t2.large": 0,
- }
- if instance_type in disks_by_instance:
- return disks_by_instance[instance_type]
- else:
- print("WARNING: Don't know number of disks on instance type %s; assuming 1"
- % instance_type, file=stderr)
- return 1
-
-
-# Deploy the configuration file templates in a given local directory to
-# a cluster, filling in any template parameters with information about the
-# cluster (e.g. lists of masters and slaves). Files are only deployed to
-# the first master instance in the cluster, and we expect the setup
-# script to be run on that instance to copy them to other nodes.
-#
-# root_dir should be an absolute path to the directory with the files we want to deploy.
-def deploy_files(conn, root_dir, opts, master_nodes, slave_nodes, modules):
- active_master = get_dns_name(master_nodes[0], opts.private_ips)
-
- num_disks = get_num_disks(opts.instance_type)
- hdfs_data_dirs = "/mnt/ephemeral-hdfs/data"
- mapred_local_dirs = "/mnt/hadoop/mrlocal"
- spark_local_dirs = "/mnt/spark"
- if num_disks > 1:
- for i in range(2, num_disks + 1):
- hdfs_data_dirs += ",/mnt%d/ephemeral-hdfs/data" % i
- mapred_local_dirs += ",/mnt%d/hadoop/mrlocal" % i
- spark_local_dirs += ",/mnt%d/spark" % i
-
- cluster_url = "%s:7077" % active_master
-
- if "." in opts.spark_version:
- # Pre-built Spark deploy
- spark_v = get_validate_spark_version(opts.spark_version, opts.spark_git_repo)
- tachyon_v = get_tachyon_version(spark_v)
- else:
- # Spark-only custom deploy
- spark_v = "%s|%s" % (opts.spark_git_repo, opts.spark_version)
- tachyon_v = ""
- print("Deploying Spark via git hash; Tachyon won't be set up")
- modules = filter(lambda x: x != "tachyon", modules)
-
- master_addresses = [get_dns_name(i, opts.private_ips) for i in master_nodes]
- slave_addresses = [get_dns_name(i, opts.private_ips) for i in slave_nodes]
- worker_instances_str = "%d" % opts.worker_instances if opts.worker_instances else ""
- template_vars = {
- "master_list": '\n'.join(master_addresses),
- "active_master": active_master,
- "slave_list": '\n'.join(slave_addresses),
- "cluster_url": cluster_url,
- "hdfs_data_dirs": hdfs_data_dirs,
- "mapred_local_dirs": mapred_local_dirs,
- "spark_local_dirs": spark_local_dirs,
- "swap": str(opts.swap),
- "modules": '\n'.join(modules),
- "spark_version": spark_v,
- "tachyon_version": tachyon_v,
- "hadoop_major_version": opts.hadoop_major_version,
- "spark_worker_instances": worker_instances_str,
- "spark_master_opts": opts.master_opts
- }
-
- if opts.copy_aws_credentials:
- template_vars["aws_access_key_id"] = conn.aws_access_key_id
- template_vars["aws_secret_access_key"] = conn.aws_secret_access_key
- else:
- template_vars["aws_access_key_id"] = ""
- template_vars["aws_secret_access_key"] = ""
-
- # Create a temp directory in which we will place all the files to be
- # deployed after we substitue template parameters in them
- tmp_dir = tempfile.mkdtemp()
- for path, dirs, files in os.walk(root_dir):
- if path.find(".svn") == -1:
- dest_dir = os.path.join('/', path[len(root_dir):])
- local_dir = tmp_dir + dest_dir
- if not os.path.exists(local_dir):
- os.makedirs(local_dir)
- for filename in files:
- if filename[0] not in '#.~' and filename[-1] != '~':
- dest_file = os.path.join(dest_dir, filename)
- local_file = tmp_dir + dest_file
- with open(os.path.join(path, filename)) as src:
- with open(local_file, "w") as dest:
- text = src.read()
- for key in template_vars:
- text = text.replace("{{" + key + "}}", template_vars[key])
- dest.write(text)
- dest.close()
- # rsync the whole directory over to the master machine
- command = [
- 'rsync', '-rv',
- '-e', stringify_command(ssh_command(opts)),
- "%s/" % tmp_dir,
- "%s@%s:/" % (opts.user, active_master)
- ]
- subprocess.check_call(command)
- # Remove the temp directory we created above
- shutil.rmtree(tmp_dir)
-
-
-# Deploy a given local directory to a cluster, WITHOUT parameter substitution.
-# Note that unlike deploy_files, this works for binary files.
-# Also, it is up to the user to add (or not) the trailing slash in root_dir.
-# Files are only deployed to the first master instance in the cluster.
-#
-# root_dir should be an absolute path.
-def deploy_user_files(root_dir, opts, master_nodes):
- active_master = get_dns_name(master_nodes[0], opts.private_ips)
- command = [
- 'rsync', '-rv',
- '-e', stringify_command(ssh_command(opts)),
- "%s" % root_dir,
- "%s@%s:/" % (opts.user, active_master)
- ]
- subprocess.check_call(command)
-
-
-def stringify_command(parts):
- if isinstance(parts, str):
- return parts
- else:
- return ' '.join(map(pipes.quote, parts))
-
-
-def ssh_args(opts):
- parts = ['-o', 'StrictHostKeyChecking=no']
- parts += ['-o', 'UserKnownHostsFile=/dev/null']
- if opts.identity_file is not None:
- parts += ['-i', opts.identity_file]
- return parts
-
-
-def ssh_command(opts):
- return ['ssh'] + ssh_args(opts)
-
-
-# Run a command on a host through ssh, retrying up to five times
-# and then throwing an exception if ssh continues to fail.
-def ssh(host, opts, command):
- tries = 0
- while True:
- try:
- return subprocess.check_call(
- ssh_command(opts) + ['-t', '-t', '%s@%s' % (opts.user, host),
- stringify_command(command)])
- except subprocess.CalledProcessError as e:
- if tries > 5:
- # If this was an ssh failure, provide the user with hints.
- if e.returncode == 255:
- raise UsageError(
- "Failed to SSH to remote host {0}.\n"
- "Please check that you have provided the correct --identity-file and "
- "--key-pair parameters and try again.".format(host))
- else:
- raise e
- print("Error executing remote command, retrying after 30 seconds: {0}".format(e),
- file=stderr)
- time.sleep(30)
- tries = tries + 1
-
-
-# Backported from Python 2.7 for compatiblity with 2.6 (See SPARK-1990)
-def _check_output(*popenargs, **kwargs):
- if 'stdout' in kwargs:
- raise ValueError('stdout argument not allowed, it will be overridden.')
- process = subprocess.Popen(stdout=subprocess.PIPE, *popenargs, **kwargs)
- output, unused_err = process.communicate()
- retcode = process.poll()
- if retcode:
- cmd = kwargs.get("args")
- if cmd is None:
- cmd = popenargs[0]
- raise subprocess.CalledProcessError(retcode, cmd, output=output)
- return output
-
-
-def ssh_read(host, opts, command):
- return _check_output(
- ssh_command(opts) + ['%s@%s' % (opts.user, host), stringify_command(command)])
-
-
-def ssh_write(host, opts, command, arguments):
- tries = 0
- while True:
- proc = subprocess.Popen(
- ssh_command(opts) + ['%s@%s' % (opts.user, host), stringify_command(command)],
- stdin=subprocess.PIPE)
- proc.stdin.write(arguments)
- proc.stdin.close()
- status = proc.wait()
- if status == 0:
- break
- elif tries > 5:
- raise RuntimeError("ssh_write failed with error %s" % proc.returncode)
- else:
- print("Error {0} while executing remote command, retrying after 30 seconds".
- format(status), file=stderr)
- time.sleep(30)
- tries = tries + 1
-
-
-# Gets a list of zones to launch instances in
-def get_zones(conn, opts):
- if opts.zone == 'all':
- zones = [z.name for z in conn.get_all_zones()]
- else:
- zones = [opts.zone]
- return zones
-
-
-# Gets the number of items in a partition
-def get_partition(total, num_partitions, current_partitions):
- num_slaves_this_zone = total // num_partitions
- if (total % num_partitions) - current_partitions > 0:
- num_slaves_this_zone += 1
- return num_slaves_this_zone
-
-
-# Gets the IP address, taking into account the --private-ips flag
-def get_ip_address(instance, private_ips=False):
- ip = instance.ip_address if not private_ips else \
- instance.private_ip_address
- return ip
-
-
-# Gets the DNS name, taking into account the --private-ips flag
-def get_dns_name(instance, private_ips=False):
- dns = instance.public_dns_name if not private_ips else \
- instance.private_ip_address
- if not dns:
- raise UsageError("Failed to determine hostname of {0}.\n"
- "Please check that you provided --private-ips if "
- "necessary".format(instance))
- return dns
-
-
-def real_main():
- (opts, action, cluster_name) = parse_args()
-
- # Input parameter validation
- get_validate_spark_version(opts.spark_version, opts.spark_git_repo)
-
- if opts.wait is not None:
- # NOTE: DeprecationWarnings are silent in 2.7+ by default.
- # To show them, run Python with the -Wdefault switch.
- # See: https://docs.python.org/3.5/whatsnew/2.7.html
- warnings.warn(
- "This option is deprecated and has no effect. "
- "spark-ec2 automatically waits as long as necessary for clusters to start up.",
- DeprecationWarning
- )
-
- if opts.identity_file is not None:
- if not os.path.exists(opts.identity_file):
- print("ERROR: The identity file '{f}' doesn't exist.".format(f=opts.identity_file),
- file=stderr)
- sys.exit(1)
-
- file_mode = os.stat(opts.identity_file).st_mode
- if not (file_mode & S_IRUSR) or not oct(file_mode)[-2:] == '00':
- print("ERROR: The identity file must be accessible only by you.", file=stderr)
- print('You can fix this with: chmod 400 "{f}"'.format(f=opts.identity_file),
- file=stderr)
- sys.exit(1)
-
- if opts.instance_type not in EC2_INSTANCE_TYPES:
- print("Warning: Unrecognized EC2 instance type for instance-type: {t}".format(
- t=opts.instance_type), file=stderr)
-
- if opts.master_instance_type != "":
- if opts.master_instance_type not in EC2_INSTANCE_TYPES:
- print("Warning: Unrecognized EC2 instance type for master-instance-type: {t}".format(
- t=opts.master_instance_type), file=stderr)
- # Since we try instance types even if we can't resolve them, we check if they resolve first
- # and, if they do, see if they resolve to the same virtualization type.
- if opts.instance_type in EC2_INSTANCE_TYPES and \
- opts.master_instance_type in EC2_INSTANCE_TYPES:
- if EC2_INSTANCE_TYPES[opts.instance_type] != \
- EC2_INSTANCE_TYPES[opts.master_instance_type]:
- print("Error: spark-ec2 currently does not support having a master and slaves "
- "with different AMI virtualization types.", file=stderr)
- print("master instance virtualization type: {t}".format(
- t=EC2_INSTANCE_TYPES[opts.master_instance_type]), file=stderr)
- print("slave instance virtualization type: {t}".format(
- t=EC2_INSTANCE_TYPES[opts.instance_type]), file=stderr)
- sys.exit(1)
-
- if opts.ebs_vol_num > 8:
- print("ebs-vol-num cannot be greater than 8", file=stderr)
- sys.exit(1)
-
- # Prevent breaking ami_prefix (/, .git and startswith checks)
- # Prevent forks with non spark-ec2 names for now.
- if opts.spark_ec2_git_repo.endswith("/") or \
- opts.spark_ec2_git_repo.endswith(".git") or \
- not opts.spark_ec2_git_repo.startswith("https://github.com") or \
- not opts.spark_ec2_git_repo.endswith("spark-ec2"):
- print("spark-ec2-git-repo must be a github repo and it must not have a trailing / or .git. "
- "Furthermore, we currently only support forks named spark-ec2.", file=stderr)
- sys.exit(1)
-
- if not (opts.deploy_root_dir is None or
- (os.path.isabs(opts.deploy_root_dir) and
- os.path.isdir(opts.deploy_root_dir) and
- os.path.exists(opts.deploy_root_dir))):
- print("--deploy-root-dir must be an absolute path to a directory that exists "
- "on the local file system", file=stderr)
- sys.exit(1)
-
- try:
- if opts.profile is None:
- conn = ec2.connect_to_region(opts.region)
- else:
- conn = ec2.connect_to_region(opts.region, profile_name=opts.profile)
- except Exception as e:
- print((e), file=stderr)
- sys.exit(1)
-
- # Select an AZ at random if it was not specified.
- if opts.zone == "":
- opts.zone = random.choice(conn.get_all_zones()).name
-
- if action == "launch":
- if opts.slaves <= 0:
- print("ERROR: You have to start at least 1 slave", file=sys.stderr)
- sys.exit(1)
- if opts.resume:
- (master_nodes, slave_nodes) = get_existing_cluster(conn, opts, cluster_name)
- else:
- (master_nodes, slave_nodes) = launch_cluster(conn, opts, cluster_name)
- wait_for_cluster_state(
- conn=conn,
- opts=opts,
- cluster_instances=(master_nodes + slave_nodes),
- cluster_state='ssh-ready'
- )
- setup_cluster(conn, master_nodes, slave_nodes, opts, True)
-
- elif action == "destroy":
- (master_nodes, slave_nodes) = get_existing_cluster(
- conn, opts, cluster_name, die_on_error=False)
-
- if any(master_nodes + slave_nodes):
- print("The following instances will be terminated:")
- for inst in master_nodes + slave_nodes:
- print("> %s" % get_dns_name(inst, opts.private_ips))
- print("ALL DATA ON ALL NODES WILL BE LOST!!")
-
- msg = "Are you sure you want to destroy the cluster {c}? (y/N) ".format(c=cluster_name)
- response = raw_input(msg)
- if response == "y":
- print("Terminating master...")
- for inst in master_nodes:
- inst.terminate()
- print("Terminating slaves...")
- for inst in slave_nodes:
- inst.terminate()
-
- # Delete security groups as well
- if opts.delete_groups:
- group_names = [cluster_name + "-master", cluster_name + "-slaves"]
- wait_for_cluster_state(
- conn=conn,
- opts=opts,
- cluster_instances=(master_nodes + slave_nodes),
- cluster_state='terminated'
- )
- print("Deleting security groups (this will take some time)...")
- attempt = 1
- while attempt <= 3:
- print("Attempt %d" % attempt)
- groups = [g for g in conn.get_all_security_groups() if g.name in group_names]
- success = True
- # Delete individual rules in all groups before deleting groups to
- # remove dependencies between them
- for group in groups:
- print("Deleting rules in security group " + group.name)
- for rule in group.rules:
- for grant in rule.grants:
- success &= group.revoke(ip_protocol=rule.ip_protocol,
- from_port=rule.from_port,
- to_port=rule.to_port,
- src_group=grant)
-
- # Sleep for AWS eventual-consistency to catch up, and for instances
- # to terminate
- time.sleep(30) # Yes, it does have to be this long :-(
- for group in groups:
- try:
- # It is needed to use group_id to make it work with VPC
- conn.delete_security_group(group_id=group.id)
- print("Deleted security group %s" % group.name)
- except boto.exception.EC2ResponseError:
- success = False
- print("Failed to delete security group %s" % group.name)
-
- # Unfortunately, group.revoke() returns True even if a rule was not
- # deleted, so this needs to be rerun if something fails
- if success:
- break
-
- attempt += 1
-
- if not success:
- print("Failed to delete all security groups after 3 tries.")
- print("Try re-running in a few minutes.")
-
- elif action == "login":
- (master_nodes, slave_nodes) = get_existing_cluster(conn, opts, cluster_name)
- if not master_nodes[0].public_dns_name and not opts.private_ips:
- print("Master has no public DNS name. Maybe you meant to specify --private-ips?")
- else:
- master = get_dns_name(master_nodes[0], opts.private_ips)
- print("Logging into master " + master + "...")
- proxy_opt = []
- if opts.proxy_port is not None:
- proxy_opt = ['-D', opts.proxy_port]
- subprocess.check_call(
- ssh_command(opts) + proxy_opt + ['-t', '-t', "%s@%s" % (opts.user, master)])
-
- elif action == "reboot-slaves":
- response = raw_input(
- "Are you sure you want to reboot the cluster " +
- cluster_name + " slaves?\n" +
- "Reboot cluster slaves " + cluster_name + " (y/N): ")
- if response == "y":
- (master_nodes, slave_nodes) = get_existing_cluster(
- conn, opts, cluster_name, die_on_error=False)
- print("Rebooting slaves...")
- for inst in slave_nodes:
- if inst.state not in ["shutting-down", "terminated"]:
- print("Rebooting " + inst.id)
- inst.reboot()
-
- elif action == "get-master":
- (master_nodes, slave_nodes) = get_existing_cluster(conn, opts, cluster_name)
- if not master_nodes[0].public_dns_name and not opts.private_ips:
- print("Master has no public DNS name. Maybe you meant to specify --private-ips?")
- else:
- print(get_dns_name(master_nodes[0], opts.private_ips))
-
- elif action == "stop":
- response = raw_input(
- "Are you sure you want to stop the cluster " +
- cluster_name + "?\nDATA ON EPHEMERAL DISKS WILL BE LOST, " +
- "BUT THE CLUSTER WILL KEEP USING SPACE ON\n" +
- "AMAZON EBS IF IT IS EBS-BACKED!!\n" +
- "All data on spot-instance slaves will be lost.\n" +
- "Stop cluster " + cluster_name + " (y/N): ")
- if response == "y":
- (master_nodes, slave_nodes) = get_existing_cluster(
- conn, opts, cluster_name, die_on_error=False)
- print("Stopping master...")
- for inst in master_nodes:
- if inst.state not in ["shutting-down", "terminated"]:
- inst.stop()
- print("Stopping slaves...")
- for inst in slave_nodes:
- if inst.state not in ["shutting-down", "terminated"]:
- if inst.spot_instance_request_id:
- inst.terminate()
- else:
- inst.stop()
-
- elif action == "start":
- (master_nodes, slave_nodes) = get_existing_cluster(conn, opts, cluster_name)
- print("Starting slaves...")
- for inst in slave_nodes:
- if inst.state not in ["shutting-down", "terminated"]:
- inst.start()
- print("Starting master...")
- for inst in master_nodes:
- if inst.state not in ["shutting-down", "terminated"]:
- inst.start()
- wait_for_cluster_state(
- conn=conn,
- opts=opts,
- cluster_instances=(master_nodes + slave_nodes),
- cluster_state='ssh-ready'
- )
-
- # Determine types of running instances
- existing_master_type = master_nodes[0].instance_type
- existing_slave_type = slave_nodes[0].instance_type
- # Setting opts.master_instance_type to the empty string indicates we
- # have the same instance type for the master and the slaves
- if existing_master_type == existing_slave_type:
- existing_master_type = ""
- opts.master_instance_type = existing_master_type
- opts.instance_type = existing_slave_type
-
- setup_cluster(conn, master_nodes, slave_nodes, opts, False)
-
- else:
- print("Invalid action: %s" % action, file=stderr)
- sys.exit(1)
-
-
-def main():
- try:
- real_main()
- except UsageError as e:
- print("\nError:\n", e, file=stderr)
- sys.exit(1)
-
-
-if __name__ == "__main__":
- logging.basicConfig()
- main()
diff --git a/make-distribution.sh b/make-distribution.sh
index a38fd8df17..327659298e 100755
--- a/make-distribution.sh
+++ b/make-distribution.sh
@@ -212,7 +212,6 @@ cp "$SPARK_HOME/README.md" "$DISTDIR"
cp -r "$SPARK_HOME/bin" "$DISTDIR"
cp -r "$SPARK_HOME/python" "$DISTDIR"
cp -r "$SPARK_HOME/sbin" "$DISTDIR"
-cp -r "$SPARK_HOME/ec2" "$DISTDIR"
# Copy SparkR if it exists
if [ -d "$SPARK_HOME"/R/lib/SparkR ]; then
mkdir -p "$DISTDIR"/R/lib