aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.gitignore1
-rw-r--r--bin/compute-classpath.cmd124
-rwxr-xr-xbin/compute-classpath.sh161
-rw-r--r--bin/load-spark-env.sh8
-rwxr-xr-xbin/pyspark59
-rw-r--r--bin/pyspark2.cmd57
-rwxr-xr-xbin/run-example2
-rwxr-xr-xbin/spark-class180
-rw-r--r--bin/spark-class2.cmd141
-rwxr-xr-xbin/spark-shell23
-rw-r--r--bin/spark-shell2.cmd27
-rwxr-xr-xbin/spark-sql20
-rwxr-xr-xbin/spark-submit66
-rw-r--r--bin/spark-submit2.cmd71
-rwxr-xr-xbin/utils.sh60
-rw-r--r--bin/windows-utils.cmd60
-rw-r--r--core/pom.xml5
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala157
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala170
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala46
-rw-r--r--core/src/main/scala/org/apache/spark/executor/Executor.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/launcher/SparkSubmitArgumentsParser.scala25
-rw-r--r--core/src/main/scala/org/apache/spark/launcher/WorkerCommandBuilder.scala50
-rw-r--r--docs/programming-guide.md5
-rw-r--r--launcher/pom.xml83
-rw-r--r--launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java362
-rw-r--r--launcher/src/main/java/org/apache/spark/launcher/CommandBuilderUtils.java296
-rw-r--r--launcher/src/main/java/org/apache/spark/launcher/Main.java173
-rw-r--r--launcher/src/main/java/org/apache/spark/launcher/SparkClassCommandBuilder.java108
-rw-r--r--launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java279
-rw-r--r--launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java327
-rw-r--r--launcher/src/main/java/org/apache/spark/launcher/SparkSubmitOptionParser.java224
-rw-r--r--launcher/src/main/java/org/apache/spark/launcher/package-info.java45
-rw-r--r--launcher/src/test/java/org/apache/spark/launcher/CommandBuilderUtilsSuite.java101
-rw-r--r--launcher/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java94
-rw-r--r--launcher/src/test/java/org/apache/spark/launcher/SparkSubmitCommandBuilderSuite.java278
-rw-r--r--launcher/src/test/java/org/apache/spark/launcher/SparkSubmitOptionParserSuite.java108
-rw-r--r--launcher/src/test/resources/log4j.properties31
-rwxr-xr-xmake-distribution.sh2
-rw-r--r--pom.xml3
-rw-r--r--project/SparkBuild.scala7
-rw-r--r--python/pyspark/java_gateway.py3
-rwxr-xr-xsbin/spark-daemon.sh84
-rwxr-xr-xsbin/start-thriftserver.sh2
44 files changed, 2891 insertions, 1238 deletions
diff --git a/.gitignore b/.gitignore
index 9757054a50..d162fa9cca 100644
--- a/.gitignore
+++ b/.gitignore
@@ -6,6 +6,7 @@
*.iml
*.iws
*.pyc
+*.pyo
.idea/
.idea_modules/
build/*.jar
diff --git a/bin/compute-classpath.cmd b/bin/compute-classpath.cmd
deleted file mode 100644
index 088f993954..0000000000
--- a/bin/compute-classpath.cmd
+++ /dev/null
@@ -1,124 +0,0 @@
-@echo off
-
-rem
-rem Licensed to the Apache Software Foundation (ASF) under one or more
-rem contributor license agreements. See the NOTICE file distributed with
-rem this work for additional information regarding copyright ownership.
-rem The ASF licenses this file to You under the Apache License, Version 2.0
-rem (the "License"); you may not use this file except in compliance with
-rem the License. You may obtain a copy of the License at
-rem
-rem http://www.apache.org/licenses/LICENSE-2.0
-rem
-rem Unless required by applicable law or agreed to in writing, software
-rem distributed under the License is distributed on an "AS IS" BASIS,
-rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-rem See the License for the specific language governing permissions and
-rem limitations under the License.
-rem
-
-rem This script computes Spark's classpath and prints it to stdout; it's used by both the "run"
-rem script and the ExecutorRunner in standalone cluster mode.
-
-rem If we're called from spark-class2.cmd, it already set enabledelayedexpansion and setting
-rem it here would stop us from affecting its copy of the CLASSPATH variable; otherwise we
-rem need to set it here because we use !datanucleus_jars! below.
-if "%DONT_PRINT_CLASSPATH%"=="1" goto skip_delayed_expansion
-setlocal enabledelayedexpansion
-:skip_delayed_expansion
-
-set SCALA_VERSION=2.10
-
-rem Figure out where the Spark framework is installed
-set FWDIR=%~dp0..\
-
-rem Load environment variables from conf\spark-env.cmd, if it exists
-if exist "%FWDIR%conf\spark-env.cmd" call "%FWDIR%conf\spark-env.cmd"
-
-rem Build up classpath
-set CLASSPATH=%SPARK_CLASSPATH%;%SPARK_SUBMIT_CLASSPATH%
-
-if not "x%SPARK_CONF_DIR%"=="x" (
- set CLASSPATH=%CLASSPATH%;%SPARK_CONF_DIR%
-) else (
- set CLASSPATH=%CLASSPATH%;%FWDIR%conf
-)
-
-if exist "%FWDIR%RELEASE" (
- for %%d in ("%FWDIR%lib\spark-assembly*.jar") do (
- set ASSEMBLY_JAR=%%d
- )
-) else (
- for %%d in ("%FWDIR%assembly\target\scala-%SCALA_VERSION%\spark-assembly*hadoop*.jar") do (
- set ASSEMBLY_JAR=%%d
- )
-)
-
-set CLASSPATH=%CLASSPATH%;%ASSEMBLY_JAR%
-
-rem When Hive support is needed, Datanucleus jars must be included on the classpath.
-rem Datanucleus jars do not work if only included in the uber jar as plugin.xml metadata is lost.
-rem Both sbt and maven will populate "lib_managed/jars/" with the datanucleus jars when Spark is
-rem built with Hive, so look for them there.
-if exist "%FWDIR%RELEASE" (
- set datanucleus_dir=%FWDIR%lib
-) else (
- set datanucleus_dir=%FWDIR%lib_managed\jars
-)
-set "datanucleus_jars="
-for %%d in ("%datanucleus_dir%\datanucleus-*.jar") do (
- set datanucleus_jars=!datanucleus_jars!;%%d
-)
-set CLASSPATH=%CLASSPATH%;%datanucleus_jars%
-
-set SPARK_CLASSES=%FWDIR%core\target\scala-%SCALA_VERSION%\classes
-set SPARK_CLASSES=%SPARK_CLASSES%;%FWDIR%repl\target\scala-%SCALA_VERSION%\classes
-set SPARK_CLASSES=%SPARK_CLASSES%;%FWDIR%mllib\target\scala-%SCALA_VERSION%\classes
-set SPARK_CLASSES=%SPARK_CLASSES%;%FWDIR%bagel\target\scala-%SCALA_VERSION%\classes
-set SPARK_CLASSES=%SPARK_CLASSES%;%FWDIR%graphx\target\scala-%SCALA_VERSION%\classes
-set SPARK_CLASSES=%SPARK_CLASSES%;%FWDIR%streaming\target\scala-%SCALA_VERSION%\classes
-set SPARK_CLASSES=%SPARK_CLASSES%;%FWDIR%tools\target\scala-%SCALA_VERSION%\classes
-set SPARK_CLASSES=%SPARK_CLASSES%;%FWDIR%sql\catalyst\target\scala-%SCALA_VERSION%\classes
-set SPARK_CLASSES=%SPARK_CLASSES%;%FWDIR%sql\core\target\scala-%SCALA_VERSION%\classes
-set SPARK_CLASSES=%SPARK_CLASSES%;%FWDIR%sql\hive\target\scala-%SCALA_VERSION%\classes
-
-set SPARK_TEST_CLASSES=%FWDIR%core\target\scala-%SCALA_VERSION%\test-classes
-set SPARK_TEST_CLASSES=%SPARK_TEST_CLASSES%;%FWDIR%repl\target\scala-%SCALA_VERSION%\test-classes
-set SPARK_TEST_CLASSES=%SPARK_TEST_CLASSES%;%FWDIR%mllib\target\scala-%SCALA_VERSION%\test-classes
-set SPARK_TEST_CLASSES=%SPARK_TEST_CLASSES%;%FWDIR%bagel\target\scala-%SCALA_VERSION%\test-classes
-set SPARK_TEST_CLASSES=%SPARK_TEST_CLASSES%;%FWDIR%graphx\target\scala-%SCALA_VERSION%\test-classes
-set SPARK_TEST_CLASSES=%SPARK_TEST_CLASSES%;%FWDIR%streaming\target\scala-%SCALA_VERSION%\test-classes
-set SPARK_TEST_CLASSES=%SPARK_TEST_CLASSES%;%FWDIR%sql\catalyst\target\scala-%SCALA_VERSION%\test-classes
-set SPARK_TEST_CLASSES=%SPARK_TEST_CLASSES%;%FWDIR%sql\core\target\scala-%SCALA_VERSION%\test-classes
-set SPARK_TEST_CLASSES=%SPARK_TEST_CLASSES%;%FWDIR%sql\hive\target\scala-%SCALA_VERSION%\test-classes
-
-if "x%SPARK_TESTING%"=="x1" (
- rem Add test clases to path - note, add SPARK_CLASSES and SPARK_TEST_CLASSES before CLASSPATH
- rem so that local compilation takes precedence over assembled jar
- set CLASSPATH=%SPARK_CLASSES%;%SPARK_TEST_CLASSES%;%CLASSPATH%
-)
-
-rem Add hadoop conf dir - else FileSystem.*, etc fail
-rem Note, this assumes that there is either a HADOOP_CONF_DIR or YARN_CONF_DIR which hosts
-rem the configurtion files.
-if "x%HADOOP_CONF_DIR%"=="x" goto no_hadoop_conf_dir
- set CLASSPATH=%CLASSPATH%;%HADOOP_CONF_DIR%
-:no_hadoop_conf_dir
-
-if "x%YARN_CONF_DIR%"=="x" goto no_yarn_conf_dir
- set CLASSPATH=%CLASSPATH%;%YARN_CONF_DIR%
-:no_yarn_conf_dir
-
-rem To allow for distributions to append needed libraries to the classpath (e.g. when
-rem using the "hadoop-provided" profile to build Spark), check SPARK_DIST_CLASSPATH and
-rem append it to tbe final classpath.
-if not "x%$SPARK_DIST_CLASSPATH%"=="x" (
- set CLASSPATH=%CLASSPATH%;%SPARK_DIST_CLASSPATH%
-)
-
-rem A bit of a hack to allow calling this script within run2.cmd without seeing output
-if "%DONT_PRINT_CLASSPATH%"=="1" goto exit
-
-echo %CLASSPATH%
-
-:exit
diff --git a/bin/compute-classpath.sh b/bin/compute-classpath.sh
deleted file mode 100755
index f4f6b7b909..0000000000
--- a/bin/compute-classpath.sh
+++ /dev/null
@@ -1,161 +0,0 @@
-#!/usr/bin/env bash
-
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-# This script computes Spark's classpath and prints it to stdout; it's used by both the "run"
-# script and the ExecutorRunner in standalone cluster mode.
-
-# Figure out where Spark is installed
-FWDIR="$(cd "`dirname "$0"`"/..; pwd)"
-
-. "$FWDIR"/bin/load-spark-env.sh
-
-if [ -n "$SPARK_CLASSPATH" ]; then
- CLASSPATH="$SPARK_CLASSPATH:$SPARK_SUBMIT_CLASSPATH"
-else
- CLASSPATH="$SPARK_SUBMIT_CLASSPATH"
-fi
-
-# Build up classpath
-if [ -n "$SPARK_CONF_DIR" ]; then
- CLASSPATH="$CLASSPATH:$SPARK_CONF_DIR"
-else
- CLASSPATH="$CLASSPATH:$FWDIR/conf"
-fi
-
-ASSEMBLY_DIR="$FWDIR/assembly/target/scala-$SPARK_SCALA_VERSION"
-
-if [ -n "$JAVA_HOME" ]; then
- JAR_CMD="$JAVA_HOME/bin/jar"
-else
- JAR_CMD="jar"
-fi
-
-# A developer option to prepend more recently compiled Spark classes
-if [ -n "$SPARK_PREPEND_CLASSES" ]; then
- echo "NOTE: SPARK_PREPEND_CLASSES is set, placing locally compiled Spark"\
- "classes ahead of assembly." >&2
- # Spark classes
- CLASSPATH="$CLASSPATH:$FWDIR/core/target/scala-$SPARK_SCALA_VERSION/classes"
- CLASSPATH="$CLASSPATH:$FWDIR/repl/target/scala-$SPARK_SCALA_VERSION/classes"
- CLASSPATH="$CLASSPATH:$FWDIR/mllib/target/scala-$SPARK_SCALA_VERSION/classes"
- CLASSPATH="$CLASSPATH:$FWDIR/bagel/target/scala-$SPARK_SCALA_VERSION/classes"
- CLASSPATH="$CLASSPATH:$FWDIR/graphx/target/scala-$SPARK_SCALA_VERSION/classes"
- CLASSPATH="$CLASSPATH:$FWDIR/streaming/target/scala-$SPARK_SCALA_VERSION/classes"
- CLASSPATH="$CLASSPATH:$FWDIR/tools/target/scala-$SPARK_SCALA_VERSION/classes"
- CLASSPATH="$CLASSPATH:$FWDIR/sql/catalyst/target/scala-$SPARK_SCALA_VERSION/classes"
- CLASSPATH="$CLASSPATH:$FWDIR/sql/core/target/scala-$SPARK_SCALA_VERSION/classes"
- CLASSPATH="$CLASSPATH:$FWDIR/sql/hive/target/scala-$SPARK_SCALA_VERSION/classes"
- CLASSPATH="$CLASSPATH:$FWDIR/sql/hive-thriftserver/target/scala-$SPARK_SCALA_VERSION/classes"
- CLASSPATH="$CLASSPATH:$FWDIR/yarn/stable/target/scala-$SPARK_SCALA_VERSION/classes"
- # Jars for shaded deps in their original form (copied here during build)
- CLASSPATH="$CLASSPATH:$FWDIR/core/target/jars/*"
-fi
-
-# Use spark-assembly jar from either RELEASE or assembly directory
-if [ -f "$FWDIR/RELEASE" ]; then
- assembly_folder="$FWDIR"/lib
-else
- assembly_folder="$ASSEMBLY_DIR"
-fi
-
-num_jars=0
-
-for f in "${assembly_folder}"/spark-assembly*hadoop*.jar; do
- if [[ ! -e "$f" ]]; then
- echo "Failed to find Spark assembly in $assembly_folder" 1>&2
- echo "You need to build Spark before running this program." 1>&2
- exit 1
- fi
- ASSEMBLY_JAR="$f"
- num_jars=$((num_jars+1))
-done
-
-if [ "$num_jars" -gt "1" ]; then
- echo "Found multiple Spark assembly jars in $assembly_folder:" 1>&2
- ls "${assembly_folder}"/spark-assembly*hadoop*.jar 1>&2
- echo "Please remove all but one jar." 1>&2
- exit 1
-fi
-
-# Verify that versions of java used to build the jars and run Spark are compatible
-jar_error_check=$("$JAR_CMD" -tf "$ASSEMBLY_JAR" nonexistent/class/path 2>&1)
-if [[ "$jar_error_check" =~ "invalid CEN header" ]]; then
- echo "Loading Spark jar with '$JAR_CMD' failed. " 1>&2
- echo "This is likely because Spark was compiled with Java 7 and run " 1>&2
- echo "with Java 6. (see SPARK-1703). Please use Java 7 to run Spark " 1>&2
- echo "or build Spark with Java 6." 1>&2
- exit 1
-fi
-
-CLASSPATH="$CLASSPATH:$ASSEMBLY_JAR"
-
-# When Hive support is needed, Datanucleus jars must be included on the classpath.
-# Datanucleus jars do not work if only included in the uber jar as plugin.xml metadata is lost.
-# Both sbt and maven will populate "lib_managed/jars/" with the datanucleus jars when Spark is
-# built with Hive, so first check if the datanucleus jars exist, and then ensure the current Spark
-# assembly is built for Hive, before actually populating the CLASSPATH with the jars.
-# Note that this check order is faster (by up to half a second) in the case where Hive is not used.
-if [ -f "$FWDIR/RELEASE" ]; then
- datanucleus_dir="$FWDIR"/lib
-else
- datanucleus_dir="$FWDIR"/lib_managed/jars
-fi
-
-datanucleus_jars="$(find "$datanucleus_dir" 2>/dev/null | grep "datanucleus-.*\\.jar$")"
-datanucleus_jars="$(echo "$datanucleus_jars" | tr "\n" : | sed s/:$//g)"
-
-if [ -n "$datanucleus_jars" ]; then
- hive_files=$("$JAR_CMD" -tf "$ASSEMBLY_JAR" org/apache/hadoop/hive/ql/exec 2>/dev/null)
- if [ -n "$hive_files" ]; then
- echo "Spark assembly has been built with Hive, including Datanucleus jars on classpath" 1>&2
- CLASSPATH="$CLASSPATH:$datanucleus_jars"
- fi
-fi
-
-# Add test classes if we're running from SBT or Maven with SPARK_TESTING set to 1
-if [[ $SPARK_TESTING == 1 ]]; then
- CLASSPATH="$CLASSPATH:$FWDIR/core/target/scala-$SPARK_SCALA_VERSION/test-classes"
- CLASSPATH="$CLASSPATH:$FWDIR/repl/target/scala-$SPARK_SCALA_VERSION/test-classes"
- CLASSPATH="$CLASSPATH:$FWDIR/mllib/target/scala-$SPARK_SCALA_VERSION/test-classes"
- CLASSPATH="$CLASSPATH:$FWDIR/bagel/target/scala-$SPARK_SCALA_VERSION/test-classes"
- CLASSPATH="$CLASSPATH:$FWDIR/graphx/target/scala-$SPARK_SCALA_VERSION/test-classes"
- CLASSPATH="$CLASSPATH:$FWDIR/streaming/target/scala-$SPARK_SCALA_VERSION/test-classes"
- CLASSPATH="$CLASSPATH:$FWDIR/sql/catalyst/target/scala-$SPARK_SCALA_VERSION/test-classes"
- CLASSPATH="$CLASSPATH:$FWDIR/sql/core/target/scala-$SPARK_SCALA_VERSION/test-classes"
- CLASSPATH="$CLASSPATH:$FWDIR/sql/hive/target/scala-$SPARK_SCALA_VERSION/test-classes"
-fi
-
-# Add hadoop conf dir if given -- otherwise FileSystem.*, etc fail !
-# Note, this assumes that there is either a HADOOP_CONF_DIR or YARN_CONF_DIR which hosts
-# the configurtion files.
-if [ -n "$HADOOP_CONF_DIR" ]; then
- CLASSPATH="$CLASSPATH:$HADOOP_CONF_DIR"
-fi
-if [ -n "$YARN_CONF_DIR" ]; then
- CLASSPATH="$CLASSPATH:$YARN_CONF_DIR"
-fi
-
-# To allow for distributions to append needed libraries to the classpath (e.g. when
-# using the "hadoop-provided" profile to build Spark), check SPARK_DIST_CLASSPATH and
-# append it to tbe final classpath.
-if [ -n "$SPARK_DIST_CLASSPATH" ]; then
- CLASSPATH="$CLASSPATH:$SPARK_DIST_CLASSPATH"
-fi
-
-echo "$CLASSPATH"
diff --git a/bin/load-spark-env.sh b/bin/load-spark-env.sh
index 356b3d49b2..2d7070c25d 100644
--- a/bin/load-spark-env.sh
+++ b/bin/load-spark-env.sh
@@ -41,9 +41,9 @@ fi
if [ -z "$SPARK_SCALA_VERSION" ]; then
- ASSEMBLY_DIR2="$FWDIR/assembly/target/scala-2.11"
- ASSEMBLY_DIR1="$FWDIR/assembly/target/scala-2.10"
-
+ ASSEMBLY_DIR2="$SPARK_HOME/assembly/target/scala-2.11"
+ ASSEMBLY_DIR1="$SPARK_HOME/assembly/target/scala-2.10"
+
if [[ -d "$ASSEMBLY_DIR2" && -d "$ASSEMBLY_DIR1" ]]; then
echo -e "Presence of build for both scala versions(SCALA 2.10 and SCALA 2.11) detected." 1>&2
echo -e 'Either clean one of them or, export SPARK_SCALA_VERSION=2.11 in spark-env.sh.' 1>&2
@@ -54,5 +54,5 @@ if [ -z "$SPARK_SCALA_VERSION" ]; then
export SPARK_SCALA_VERSION="2.11"
else
export SPARK_SCALA_VERSION="2.10"
- fi
+ fi
fi
diff --git a/bin/pyspark b/bin/pyspark
index 0b4f695dd0..e7f6a1a072 100755
--- a/bin/pyspark
+++ b/bin/pyspark
@@ -18,36 +18,24 @@
#
# Figure out where Spark is installed
-FWDIR="$(cd "`dirname "$0"`"/..; pwd)"
+export SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"
-# Export this as SPARK_HOME
-export SPARK_HOME="$FWDIR"
-
-source "$FWDIR/bin/utils.sh"
-
-source "$FWDIR"/bin/load-spark-env.sh
+source "$SPARK_HOME"/bin/load-spark-env.sh
function usage() {
+ if [ -n "$1" ]; then
+ echo $1
+ fi
echo "Usage: ./bin/pyspark [options]" 1>&2
- "$FWDIR"/bin/spark-submit --help 2>&1 | grep -v Usage 1>&2
- exit 0
+ "$SPARK_HOME"/bin/spark-submit --help 2>&1 | grep -v Usage 1>&2
+ exit $2
}
+export -f usage
if [[ "$@" = *--help ]] || [[ "$@" = *-h ]]; then
usage
fi
-# Exit if the user hasn't compiled Spark
-if [ ! -f "$FWDIR/RELEASE" ]; then
- # Exit if the user hasn't compiled Spark
- ls "$FWDIR"/assembly/target/scala-$SPARK_SCALA_VERSION/spark-assembly*hadoop*.jar >& /dev/null
- if [[ $? != 0 ]]; then
- echo "Failed to find Spark assembly in $FWDIR/assembly/target" 1>&2
- echo "You need to build Spark before running this program" 1>&2
- exit 1
- fi
-fi
-
# In Spark <= 1.1, setting IPYTHON=1 would cause the driver to be launched using the `ipython`
# executable, while the worker would still be launched using PYSPARK_PYTHON.
#
@@ -95,26 +83,13 @@ export PYTHONPATH="$SPARK_HOME/python/lib/py4j-0.8.2.1-src.zip:$PYTHONPATH"
# Load the PySpark shell.py script when ./pyspark is used interactively:
export OLD_PYTHONSTARTUP="$PYTHONSTARTUP"
-export PYTHONSTARTUP="$FWDIR/python/pyspark/shell.py"
-
-# Build up arguments list manually to preserve quotes and backslashes.
-# We export Spark submit arguments as an environment variable because shell.py must run as a
-# PYTHONSTARTUP script, which does not take in arguments. This is required for IPython notebooks.
-SUBMIT_USAGE_FUNCTION=usage
-gatherSparkSubmitOpts "$@"
-PYSPARK_SUBMIT_ARGS=""
-whitespace="[[:space:]]"
-for i in "${SUBMISSION_OPTS[@]}"; do
- if [[ $i =~ \" ]]; then i=$(echo $i | sed 's/\"/\\\"/g'); fi
- if [[ $i =~ $whitespace ]]; then i=\"$i\"; fi
- PYSPARK_SUBMIT_ARGS="$PYSPARK_SUBMIT_ARGS $i"
-done
-export PYSPARK_SUBMIT_ARGS
+export PYTHONSTARTUP="$SPARK_HOME/python/pyspark/shell.py"
# For pyspark tests
if [[ -n "$SPARK_TESTING" ]]; then
unset YARN_CONF_DIR
unset HADOOP_CONF_DIR
+ export PYSPARK_SUBMIT_ARGS=pyspark-shell
if [[ -n "$PYSPARK_DOC_TEST" ]]; then
exec "$PYSPARK_DRIVER_PYTHON" -m doctest $1
else
@@ -123,14 +98,6 @@ if [[ -n "$SPARK_TESTING" ]]; then
exit
fi
-# If a python file is provided, directly run spark-submit.
-if [[ "$1" =~ \.py$ ]]; then
- echo -e "\nWARNING: Running python applications through ./bin/pyspark is deprecated as of Spark 1.0." 1>&2
- echo -e "Use ./bin/spark-submit <python file>\n" 1>&2
- primary="$1"
- shift
- gatherSparkSubmitOpts "$@"
- exec "$FWDIR"/bin/spark-submit "${SUBMISSION_OPTS[@]}" "$primary" "${APPLICATION_OPTS[@]}"
-else
- exec "$PYSPARK_DRIVER_PYTHON" $PYSPARK_DRIVER_PYTHON_OPTS
-fi
+export PYSPARK_DRIVER_PYTHON
+export PYSPARK_DRIVER_PYTHON_OPTS
+exec "$SPARK_HOME"/bin/spark-submit pyspark-shell-main "$@"
diff --git a/bin/pyspark2.cmd b/bin/pyspark2.cmd
index a542ec80b4..4f5eb5e206 100644
--- a/bin/pyspark2.cmd
+++ b/bin/pyspark2.cmd
@@ -17,59 +17,22 @@ rem See the License for the specific language governing permissions and
rem limitations under the License.
rem
-set SCALA_VERSION=2.10
-
rem Figure out where the Spark framework is installed
-set FWDIR=%~dp0..\
-
-rem Export this as SPARK_HOME
-set SPARK_HOME=%FWDIR%
-
-rem Test whether the user has built Spark
-if exist "%FWDIR%RELEASE" goto skip_build_test
-set FOUND_JAR=0
-for %%d in ("%FWDIR%assembly\target\scala-%SCALA_VERSION%\spark-assembly*hadoop*.jar") do (
- set FOUND_JAR=1
-)
-if [%FOUND_JAR%] == [0] (
- echo Failed to find Spark assembly JAR.
- echo You need to build Spark before running this program.
- goto exit
-)
-:skip_build_test
+set SPARK_HOME=%~dp0..
rem Load environment variables from conf\spark-env.cmd, if it exists
-if exist "%FWDIR%conf\spark-env.cmd" call "%FWDIR%conf\spark-env.cmd"
+if exist "%SPARK_HOME%\conf\spark-env.cmd" call "%SPARK_HOME%\conf\spark-env.cmd"
rem Figure out which Python to use.
-if [%PYSPARK_PYTHON%] == [] set PYSPARK_PYTHON=python
+if "x%PYSPARK_DRIVER_PYTHON%"=="x" (
+ set PYSPARK_DRIVER_PYTHON=python
+ if not [%PYSPARK_PYTHON%] == [] set PYSPARK_DRIVER_PYTHON=%PYSPARK_PYTHON%
+)
-set PYTHONPATH=%FWDIR%python;%PYTHONPATH%
-set PYTHONPATH=%FWDIR%python\lib\py4j-0.8.2.1-src.zip;%PYTHONPATH%
+set PYTHONPATH=%SPARK_HOME%\python;%PYTHONPATH%
+set PYTHONPATH=%SPARK_HOME%\python\lib\py4j-0.8.2.1-src.zip;%PYTHONPATH%
set OLD_PYTHONSTARTUP=%PYTHONSTARTUP%
-set PYTHONSTARTUP=%FWDIR%python\pyspark\shell.py
-set PYSPARK_SUBMIT_ARGS=%*
-
-echo Running %PYSPARK_PYTHON% with PYTHONPATH=%PYTHONPATH%
-
-rem Check whether the argument is a file
-for /f %%i in ('echo %1^| findstr /R "\.py"') do (
- set PYTHON_FILE=%%i
-)
-
-if [%PYTHON_FILE%] == [] (
- if [%IPYTHON%] == [1] (
- ipython %IPYTHON_OPTS%
- ) else (
- %PYSPARK_PYTHON%
- )
-) else (
- echo.
- echo WARNING: Running python applications through ./bin/pyspark.cmd is deprecated as of Spark 1.0.
- echo Use ./bin/spark-submit ^<python file^>
- echo.
- "%FWDIR%\bin\spark-submit.cmd" %PYSPARK_SUBMIT_ARGS%
-)
+set PYTHONSTARTUP=%SPARK_HOME%\python\pyspark\shell.py
-:exit
+call %SPARK_HOME%\bin\spark-submit2.cmd pyspark-shell-main %*
diff --git a/bin/run-example b/bin/run-example
index a106411392..798e2caeb8 100755
--- a/bin/run-example
+++ b/bin/run-example
@@ -67,7 +67,7 @@ if [[ ! $EXAMPLE_CLASS == org.apache.spark.examples* ]]; then
EXAMPLE_CLASS="org.apache.spark.examples.$EXAMPLE_CLASS"
fi
-"$FWDIR"/bin/spark-submit \
+exec "$FWDIR"/bin/spark-submit \
--master $EXAMPLE_MASTER \
--class $EXAMPLE_CLASS \
"$SPARK_EXAMPLES_JAR" \
diff --git a/bin/spark-class b/bin/spark-class
index 2f0441bb3c..e29b234afa 100755
--- a/bin/spark-class
+++ b/bin/spark-class
@@ -16,89 +16,18 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
-
-# NOTE: Any changes to this file must be reflected in SparkSubmitDriverBootstrapper.scala!
-
-cygwin=false
-case "`uname`" in
- CYGWIN*) cygwin=true;;
-esac
+set -e
# Figure out where Spark is installed
-FWDIR="$(cd "`dirname "$0"`"/..; pwd)"
+export SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"
-# Export this as SPARK_HOME
-export SPARK_HOME="$FWDIR"
-export SPARK_CONF_DIR="${SPARK_CONF_DIR:-"$SPARK_HOME/conf"}"
-
-. "$FWDIR"/bin/load-spark-env.sh
+. "$SPARK_HOME"/bin/load-spark-env.sh
if [ -z "$1" ]; then
echo "Usage: spark-class <class> [<args>]" 1>&2
exit 1
fi
-if [ -n "$SPARK_MEM" ]; then
- echo -e "Warning: SPARK_MEM is deprecated, please use a more specific config option" 1>&2
- echo -e "(e.g., spark.executor.memory or spark.driver.memory)." 1>&2
-fi
-
-# Use SPARK_MEM or 512m as the default memory, to be overridden by specific options
-DEFAULT_MEM=${SPARK_MEM:-512m}
-
-SPARK_DAEMON_JAVA_OPTS="$SPARK_DAEMON_JAVA_OPTS -Dspark.akka.logLifecycleEvents=true"
-
-# Add java opts and memory settings for master, worker, history server, executors, and repl.
-case "$1" in
- # Master, Worker, and HistoryServer use SPARK_DAEMON_JAVA_OPTS (and specific opts) + SPARK_DAEMON_MEMORY.
- 'org.apache.spark.deploy.master.Master')
- OUR_JAVA_OPTS="$SPARK_DAEMON_JAVA_OPTS $SPARK_MASTER_OPTS"
- OUR_JAVA_MEM=${SPARK_DAEMON_MEMORY:-$DEFAULT_MEM}
- ;;
- 'org.apache.spark.deploy.worker.Worker')
- OUR_JAVA_OPTS="$SPARK_DAEMON_JAVA_OPTS $SPARK_WORKER_OPTS"
- OUR_JAVA_MEM=${SPARK_DAEMON_MEMORY:-$DEFAULT_MEM}
- ;;
- 'org.apache.spark.deploy.history.HistoryServer')
- OUR_JAVA_OPTS="$SPARK_DAEMON_JAVA_OPTS $SPARK_HISTORY_OPTS"
- OUR_JAVA_MEM=${SPARK_DAEMON_MEMORY:-$DEFAULT_MEM}
- ;;
-
- # Executors use SPARK_JAVA_OPTS + SPARK_EXECUTOR_MEMORY.
- 'org.apache.spark.executor.CoarseGrainedExecutorBackend')
- OUR_JAVA_OPTS="$SPARK_JAVA_OPTS $SPARK_EXECUTOR_OPTS"
- OUR_JAVA_MEM=${SPARK_EXECUTOR_MEMORY:-$DEFAULT_MEM}
- ;;
- 'org.apache.spark.executor.MesosExecutorBackend')
- OUR_JAVA_OPTS="$SPARK_JAVA_OPTS $SPARK_EXECUTOR_OPTS"
- OUR_JAVA_MEM=${SPARK_EXECUTOR_MEMORY:-$DEFAULT_MEM}
- export PYTHONPATH="$FWDIR/python:$PYTHONPATH"
- export PYTHONPATH="$FWDIR/python/lib/py4j-0.8.2.1-src.zip:$PYTHONPATH"
- ;;
-
- # Spark submit uses SPARK_JAVA_OPTS + SPARK_SUBMIT_OPTS +
- # SPARK_DRIVER_MEMORY + SPARK_SUBMIT_DRIVER_MEMORY.
- 'org.apache.spark.deploy.SparkSubmit')
- OUR_JAVA_OPTS="$SPARK_JAVA_OPTS $SPARK_SUBMIT_OPTS"
- OUR_JAVA_MEM=${SPARK_DRIVER_MEMORY:-$DEFAULT_MEM}
- if [ -n "$SPARK_SUBMIT_LIBRARY_PATH" ]; then
- if [[ $OSTYPE == darwin* ]]; then
- export DYLD_LIBRARY_PATH="$SPARK_SUBMIT_LIBRARY_PATH:$DYLD_LIBRARY_PATH"
- else
- export LD_LIBRARY_PATH="$SPARK_SUBMIT_LIBRARY_PATH:$LD_LIBRARY_PATH"
- fi
- fi
- if [ -n "$SPARK_SUBMIT_DRIVER_MEMORY" ]; then
- OUR_JAVA_MEM="$SPARK_SUBMIT_DRIVER_MEMORY"
- fi
- ;;
-
- *)
- OUR_JAVA_OPTS="$SPARK_JAVA_OPTS"
- OUR_JAVA_MEM=${SPARK_DRIVER_MEMORY:-$DEFAULT_MEM}
- ;;
-esac
-
# Find the java binary
if [ -n "${JAVA_HOME}" ]; then
RUNNER="${JAVA_HOME}/bin/java"
@@ -110,83 +39,48 @@ else
exit 1
fi
fi
-JAVA_VERSION=$("$RUNNER" -version 2>&1 | grep 'version' | sed 's/.* version "\(.*\)\.\(.*\)\..*"/\1\2/; 1q')
-
-# Set JAVA_OPTS to be able to load native libraries and to set heap size
-if [ "$JAVA_VERSION" -ge 18 ]; then
- JAVA_OPTS="$OUR_JAVA_OPTS"
-else
- JAVA_OPTS="-XX:MaxPermSize=128m $OUR_JAVA_OPTS"
-fi
-JAVA_OPTS="$JAVA_OPTS -Xms$OUR_JAVA_MEM -Xmx$OUR_JAVA_MEM"
-
-# Load extra JAVA_OPTS from conf/java-opts, if it exists
-if [ -e "$SPARK_CONF_DIR/java-opts" ] ; then
- JAVA_OPTS="$JAVA_OPTS `cat "$SPARK_CONF_DIR"/java-opts`"
-fi
-
-# Attention: when changing the way the JAVA_OPTS are assembled, the change must be reflected in CommandUtils.scala!
-
-TOOLS_DIR="$FWDIR"/tools
-SPARK_TOOLS_JAR=""
-if [ -e "$TOOLS_DIR"/target/scala-$SPARK_SCALA_VERSION/spark-tools*[0-9Tg].jar ]; then
- # Use the JAR from the SBT build
- export SPARK_TOOLS_JAR="`ls "$TOOLS_DIR"/target/scala-$SPARK_SCALA_VERSION/spark-tools*[0-9Tg].jar`"
-fi
-if [ -e "$TOOLS_DIR"/target/spark-tools*[0-9Tg].jar ]; then
- # Use the JAR from the Maven build
- # TODO: this also needs to become an assembly!
- export SPARK_TOOLS_JAR="`ls "$TOOLS_DIR"/target/spark-tools*[0-9Tg].jar`"
-fi
-# Compute classpath using external script
-classpath_output=$("$FWDIR"/bin/compute-classpath.sh)
-if [[ "$?" != "0" ]]; then
- echo "$classpath_output"
- exit 1
-else
- CLASSPATH="$classpath_output"
-fi
+# Look for the launcher. In non-release mode, add the compiled classes directly to the classpath
+# instead of looking for a jar file.
+SPARK_LAUNCHER_CP=
+if [ -f $SPARK_HOME/RELEASE ]; then
+ LAUNCHER_DIR="$SPARK_HOME/lib"
+ num_jars="$(ls -1 "$LAUNCHER_DIR" | grep "^spark-launcher.*\.jar$" | wc -l)"
+ if [ "$num_jars" -eq "0" -a -z "$SPARK_LAUNCHER_CP" ]; then
+ echo "Failed to find Spark launcher in $LAUNCHER_DIR." 1>&2
+ echo "You need to build Spark before running this program." 1>&2
+ exit 1
+ fi
-if [[ "$1" =~ org.apache.spark.tools.* ]]; then
- if test -z "$SPARK_TOOLS_JAR"; then
- echo "Failed to find Spark Tools Jar in $FWDIR/tools/target/scala-$SPARK_SCALA_VERSION/" 1>&2
- echo "You need to run \"build/sbt tools/package\" before running $1." 1>&2
+ LAUNCHER_JARS="$(ls -1 "$LAUNCHER_DIR" | grep "^spark-launcher.*\.jar$" || true)"
+ if [ "$num_jars" -gt "1" ]; then
+ echo "Found multiple Spark launcher jars in $LAUNCHER_DIR:" 1>&2
+ echo "$LAUNCHER_JARS" 1>&2
+ echo "Please remove all but one jar." 1>&2
exit 1
fi
- CLASSPATH="$CLASSPATH:$SPARK_TOOLS_JAR"
-fi
-if $cygwin; then
- CLASSPATH="`cygpath -wp "$CLASSPATH"`"
- if [ "$1" == "org.apache.spark.tools.JavaAPICompletenessChecker" ]; then
- export SPARK_TOOLS_JAR="`cygpath -w "$SPARK_TOOLS_JAR"`"
+ SPARK_LAUNCHER_CP="${LAUNCHER_DIR}/${LAUNCHER_JARS}"
+else
+ LAUNCHER_DIR="$SPARK_HOME/launcher/target/scala-$SPARK_SCALA_VERSION"
+ if [ ! -d "$LAUNCHER_DIR/classes" ]; then
+ echo "Failed to find Spark launcher classes in $LAUNCHER_DIR." 1>&2
+ echo "You need to build Spark before running this program." 1>&2
+ exit 1
fi
+ SPARK_LAUNCHER_CP="$LAUNCHER_DIR/classes"
fi
-export CLASSPATH
-# In Spark submit client mode, the driver is launched in the same JVM as Spark submit itself.
-# Here we must parse the properties file for relevant "spark.driver.*" configs before launching
-# the driver JVM itself. Instead of handling this complexity in Bash, we launch a separate JVM
-# to prepare the launch environment of this driver JVM.
+# The launcher library will print arguments separated by a NULL character, to allow arguments with
+# characters that would be otherwise interpreted by the shell. Read that in a while loop, populating
+# an array that will be used to exec the final command.
+CMD=()
+while IFS= read -d '' -r ARG; do
+ CMD+=("$ARG")
+done < <("$RUNNER" -cp "$SPARK_LAUNCHER_CP" org.apache.spark.launcher.Main "$@")
-if [ -n "$SPARK_SUBMIT_BOOTSTRAP_DRIVER" ]; then
- # This is used only if the properties file actually contains these special configs
- # Export the environment variables needed by SparkSubmitDriverBootstrapper
- export RUNNER
- export CLASSPATH
- export JAVA_OPTS
- export OUR_JAVA_MEM
- export SPARK_CLASS=1
- shift # Ignore main class (org.apache.spark.deploy.SparkSubmit) and use our own
- exec "$RUNNER" org.apache.spark.deploy.SparkSubmitDriverBootstrapper "$@"
+if [ "${CMD[0]}" = "usage" ]; then
+ "${CMD[@]}"
else
- # Note: The format of this command is closely echoed in SparkSubmitDriverBootstrapper.scala
- if [ -n "$SPARK_PRINT_LAUNCH_COMMAND" ]; then
- echo -n "Spark Command: " 1>&2
- echo "$RUNNER" -cp "$CLASSPATH" $JAVA_OPTS "$@" 1>&2
- echo -e "========================================\n" 1>&2
- fi
- exec "$RUNNER" -cp "$CLASSPATH" $JAVA_OPTS "$@"
+ exec "${CMD[@]}"
fi
-
diff --git a/bin/spark-class2.cmd b/bin/spark-class2.cmd
index da46543647..37d22215a0 100644
--- a/bin/spark-class2.cmd
+++ b/bin/spark-class2.cmd
@@ -17,135 +17,54 @@ rem See the License for the specific language governing permissions and
rem limitations under the License.
rem
-rem Any changes to this file must be reflected in SparkSubmitDriverBootstrapper.scala!
-
-setlocal enabledelayedexpansion
-
-set SCALA_VERSION=2.10
-
rem Figure out where the Spark framework is installed
-set FWDIR=%~dp0..\
-
-rem Export this as SPARK_HOME
-set SPARK_HOME=%FWDIR%
+set SPARK_HOME=%~dp0..
rem Load environment variables from conf\spark-env.cmd, if it exists
-if exist "%FWDIR%conf\spark-env.cmd" call "%FWDIR%conf\spark-env.cmd"
+if exist "%SPARK_HOME%\conf\spark-env.cmd" call "%SPARK_HOME%\conf\spark-env.cmd"
rem Test that an argument was given
-if not "x%1"=="x" goto arg_given
+if "x%1"=="x" (
echo Usage: spark-class ^<class^> [^<args^>]
- goto exit
-:arg_given
-
-if not "x%SPARK_MEM%"=="x" (
- echo Warning: SPARK_MEM is deprecated, please use a more specific config option
- echo e.g., spark.executor.memory or spark.driver.memory.
+ exit /b 1
)
-rem Use SPARK_MEM or 512m as the default memory, to be overridden by specific options
-set OUR_JAVA_MEM=%SPARK_MEM%
-if "x%OUR_JAVA_MEM%"=="x" set OUR_JAVA_MEM=512m
-
-set SPARK_DAEMON_JAVA_OPTS=%SPARK_DAEMON_JAVA_OPTS% -Dspark.akka.logLifecycleEvents=true
-
-rem Add java opts and memory settings for master, worker, history server, executors, and repl.
-rem Master, Worker and HistoryServer use SPARK_DAEMON_JAVA_OPTS (and specific opts) + SPARK_DAEMON_MEMORY.
-if "%1"=="org.apache.spark.deploy.master.Master" (
- set OUR_JAVA_OPTS=%SPARK_DAEMON_JAVA_OPTS% %SPARK_MASTER_OPTS%
- if not "x%SPARK_DAEMON_MEMORY%"=="x" set OUR_JAVA_MEM=%SPARK_DAEMON_MEMORY%
-) else if "%1"=="org.apache.spark.deploy.worker.Worker" (
- set OUR_JAVA_OPTS=%SPARK_DAEMON_JAVA_OPTS% %SPARK_WORKER_OPTS%
- if not "x%SPARK_DAEMON_MEMORY%"=="x" set OUR_JAVA_MEM=%SPARK_DAEMON_MEMORY%
-) else if "%1"=="org.apache.spark.deploy.history.HistoryServer" (
- set OUR_JAVA_OPTS=%SPARK_DAEMON_JAVA_OPTS% %SPARK_HISTORY_OPTS%
- if not "x%SPARK_DAEMON_MEMORY%"=="x" set OUR_JAVA_MEM=%SPARK_DAEMON_MEMORY%
-
-rem Executors use SPARK_JAVA_OPTS + SPARK_EXECUTOR_MEMORY.
-) else if "%1"=="org.apache.spark.executor.CoarseGrainedExecutorBackend" (
- set OUR_JAVA_OPTS=%SPARK_JAVA_OPTS% %SPARK_EXECUTOR_OPTS%
- if not "x%SPARK_EXECUTOR_MEMORY%"=="x" set OUR_JAVA_MEM=%SPARK_EXECUTOR_MEMORY%
-) else if "%1"=="org.apache.spark.executor.MesosExecutorBackend" (
- set OUR_JAVA_OPTS=%SPARK_JAVA_OPTS% %SPARK_EXECUTOR_OPTS%
- if not "x%SPARK_EXECUTOR_MEMORY%"=="x" set OUR_JAVA_MEM=%SPARK_EXECUTOR_MEMORY%
+set LAUNCHER_CP=0
+if exist %SPARK_HOME%\RELEASE goto find_release_launcher
-rem Spark submit uses SPARK_JAVA_OPTS + SPARK_SUBMIT_OPTS +
-rem SPARK_DRIVER_MEMORY + SPARK_SUBMIT_DRIVER_MEMORY.
-rem The repl also uses SPARK_REPL_OPTS.
-) else if "%1"=="org.apache.spark.deploy.SparkSubmit" (
- set OUR_JAVA_OPTS=%SPARK_JAVA_OPTS% %SPARK_SUBMIT_OPTS% %SPARK_REPL_OPTS%
- if not "x%SPARK_SUBMIT_LIBRARY_PATH%"=="x" (
- set OUR_JAVA_OPTS=!OUR_JAVA_OPTS! -Djava.library.path=%SPARK_SUBMIT_LIBRARY_PATH%
- ) else if not "x%SPARK_LIBRARY_PATH%"=="x" (
- set OUR_JAVA_OPTS=!OUR_JAVA_OPTS! -Djava.library.path=%SPARK_LIBRARY_PATH%
- )
- if not "x%SPARK_DRIVER_MEMORY%"=="x" set OUR_JAVA_MEM=%SPARK_DRIVER_MEMORY%
- if not "x%SPARK_SUBMIT_DRIVER_MEMORY%"=="x" set OUR_JAVA_MEM=%SPARK_SUBMIT_DRIVER_MEMORY%
-) else (
- set OUR_JAVA_OPTS=%SPARK_JAVA_OPTS%
- if not "x%SPARK_DRIVER_MEMORY%"=="x" set OUR_JAVA_MEM=%SPARK_DRIVER_MEMORY%
+rem Look for the Spark launcher in both Scala build directories. The launcher doesn't use Scala so
+rem it doesn't really matter which one is picked up. Add the compiled classes directly to the
+rem classpath instead of looking for a jar file, since it's very common for people using sbt to use
+rem the "assembly" target instead of "package".
+set LAUNCHER_CLASSES=%SPARK_HOME%\launcher\target\scala-2.10\classes
+if exist %LAUNCHER_CLASSES% (
+ set LAUNCHER_CP=%LAUNCHER_CLASSES%
)
-
-rem Set JAVA_OPTS to be able to load native libraries and to set heap size
-for /f "tokens=3" %%i in ('java -version 2^>^&1 ^| find "version"') do set jversion=%%i
-for /f "tokens=1 delims=_" %%i in ("%jversion:~1,-1%") do set jversion=%%i
-if "%jversion%" geq "1.8.0" (
- set JAVA_OPTS=%OUR_JAVA_OPTS% -Xms%OUR_JAVA_MEM% -Xmx%OUR_JAVA_MEM%
-) else (
- set JAVA_OPTS=-XX:MaxPermSize=128m %OUR_JAVA_OPTS% -Xms%OUR_JAVA_MEM% -Xmx%OUR_JAVA_MEM%
+set LAUNCHER_CLASSES=%SPARK_HOME%\launcher\target\scala-2.11\classes
+if exist %LAUNCHER_CLASSES% (
+ set LAUNCHER_CP=%LAUNCHER_CLASSES%
)
-rem Attention: when changing the way the JAVA_OPTS are assembled, the change must be reflected in CommandUtils.scala!
+goto check_launcher
-rem Test whether the user has built Spark
-if exist "%FWDIR%RELEASE" goto skip_build_test
-set FOUND_JAR=0
-for %%d in ("%FWDIR%assembly\target\scala-%SCALA_VERSION%\spark-assembly*hadoop*.jar") do (
- set FOUND_JAR=1
-)
-if "%FOUND_JAR%"=="0" (
- echo Failed to find Spark assembly JAR.
- echo You need to build Spark before running this program.
- goto exit
+:find_release_launcher
+for %%d in (%SPARK_HOME%\lib\spark-launcher*.jar) do (
+ set LAUNCHER_CP=%%d
)
-:skip_build_test
-set TOOLS_DIR=%FWDIR%tools
-set SPARK_TOOLS_JAR=
-for %%d in ("%TOOLS_DIR%\target\scala-%SCALA_VERSION%\spark-tools*assembly*.jar") do (
- set SPARK_TOOLS_JAR=%%d
+:check_launcher
+if "%LAUNCHER_CP%"=="0" (
+ echo Failed to find Spark launcher JAR.
+ echo You need to build Spark before running this program.
+ exit /b 1
)
-rem Compute classpath using external script
-set DONT_PRINT_CLASSPATH=1
-call "%FWDIR%bin\compute-classpath.cmd"
-set DONT_PRINT_CLASSPATH=0
-set CLASSPATH=%CLASSPATH%;%SPARK_TOOLS_JAR%
-
rem Figure out where java is.
set RUNNER=java
if not "x%JAVA_HOME%"=="x" set RUNNER=%JAVA_HOME%\bin\java
-rem In Spark submit client mode, the driver is launched in the same JVM as Spark submit itself.
-rem Here we must parse the properties file for relevant "spark.driver.*" configs before launching
-rem the driver JVM itself. Instead of handling this complexity here, we launch a separate JVM
-rem to prepare the launch environment of this driver JVM.
-
-rem In this case, leave out the main class (org.apache.spark.deploy.SparkSubmit) and use our own.
-rem Leaving out the first argument is surprisingly difficult to do in Windows. Note that this must
-rem be done here because the Windows "shift" command does not work in a conditional block.
-set BOOTSTRAP_ARGS=
-shift
-:start_parse
-if "%~1" == "" goto end_parse
-set BOOTSTRAP_ARGS=%BOOTSTRAP_ARGS% %~1
-shift
-goto start_parse
-:end_parse
-
-if not [%SPARK_SUBMIT_BOOTSTRAP_DRIVER%] == [] (
- set SPARK_CLASS=1
- "%RUNNER%" org.apache.spark.deploy.SparkSubmitDriverBootstrapper %BOOTSTRAP_ARGS%
-) else (
- "%RUNNER%" -cp "%CLASSPATH%" %JAVA_OPTS% %*
+rem The launcher library prints the command to be executed in a single line suitable for being
+rem executed by the batch interpreter. So read all the output of the launcher into a variable.
+for /f "tokens=*" %%i in ('cmd /C ""%RUNNER%" -cp %LAUNCHER_CP% org.apache.spark.launcher.Main %*"') do (
+ set SPARK_CMD=%%i
)
-:exit
+%SPARK_CMD%
diff --git a/bin/spark-shell b/bin/spark-shell
index cca5aa0676..b3761b5e13 100755
--- a/bin/spark-shell
+++ b/bin/spark-shell
@@ -28,25 +28,24 @@ esac
# Enter posix mode for bash
set -o posix
-## Global script variables
-FWDIR="$(cd "`dirname "$0"`"/..; pwd)"
+export FWDIR="$(cd "`dirname "$0"`"/..; pwd)"
-function usage() {
+usage() {
+ if [ -n "$1" ]; then
+ echo "$1"
+ fi
echo "Usage: ./bin/spark-shell [options]"
"$FWDIR"/bin/spark-submit --help 2>&1 | grep -v Usage 1>&2
- exit 0
+ exit "$2"
}
+export -f usage
if [[ "$@" = *--help ]] || [[ "$@" = *-h ]]; then
- usage
+ usage "" 0
fi
-source "$FWDIR"/bin/utils.sh
-SUBMIT_USAGE_FUNCTION=usage
-gatherSparkSubmitOpts "$@"
-
# SPARK-4161: scala does not assume use of the java classpath,
-# so we need to add the "-Dscala.usejavacp=true" flag mnually. We
+# so we need to add the "-Dscala.usejavacp=true" flag manually. We
# do this specifically for the Spark shell because the scala REPL
# has its own class loader, and any additional classpath specified
# through spark.driver.extraClassPath is not automatically propagated.
@@ -61,11 +60,11 @@ function main() {
# (see https://github.com/sbt/sbt/issues/562).
stty -icanon min 1 -echo > /dev/null 2>&1
export SPARK_SUBMIT_OPTS="$SPARK_SUBMIT_OPTS -Djline.terminal=unix"
- "$FWDIR"/bin/spark-submit --class org.apache.spark.repl.Main "${SUBMISSION_OPTS[@]}" spark-shell "${APPLICATION_OPTS[@]}"
+ "$FWDIR"/bin/spark-submit --class org.apache.spark.repl.Main "$@"
stty icanon echo > /dev/null 2>&1
else
export SPARK_SUBMIT_OPTS
- "$FWDIR"/bin/spark-submit --class org.apache.spark.repl.Main "${SUBMISSION_OPTS[@]}" spark-shell "${APPLICATION_OPTS[@]}"
+ "$FWDIR"/bin/spark-submit --class org.apache.spark.repl.Main "$@"
fi
}
diff --git a/bin/spark-shell2.cmd b/bin/spark-shell2.cmd
index 1d1a40da31..02f51fe59a 100644
--- a/bin/spark-shell2.cmd
+++ b/bin/spark-shell2.cmd
@@ -25,17 +25,28 @@ if %ERRORLEVEL% equ 0 (
exit /b 0
)
-call %SPARK_HOME%\bin\windows-utils.cmd %*
-if %ERRORLEVEL% equ 1 (
+rem SPARK-4161: scala does not assume use of the java classpath,
+rem so we need to add the "-Dscala.usejavacp=true" flag manually. We
+rem do this specifically for the Spark shell because the scala REPL
+rem has its own class loader, and any additional classpath specified
+rem through spark.driver.extraClassPath is not automatically propagated.
+if "x%SPARK_SUBMIT_OPTS%"=="x" (
+ set SPARK_SUBMIT_OPTS=-Dscala.usejavacp=true
+ goto run_shell
+)
+set SPARK_SUBMIT_OPTS="%SPARK_SUBMIT_OPTS% -Dscala.usejavacp=true"
+
+:run_shell
+call %SPARK_HOME%\bin\spark-submit2.cmd --class org.apache.spark.repl.Main %*
+set SPARK_ERROR_LEVEL=%ERRORLEVEL%
+if not "x%SPARK_LAUNCHER_USAGE_ERROR%"=="x" (
call :usage
exit /b 1
)
-
-cmd /V /E /C %SPARK_HOME%\bin\spark-submit.cmd --class org.apache.spark.repl.Main %SUBMISSION_OPTS% spark-shell %APPLICATION_OPTS%
-
-exit /b 0
+exit /b %SPARK_ERROR_LEVEL%
:usage
+echo %SPARK_LAUNCHER_USAGE_ERROR%
echo "Usage: .\bin\spark-shell.cmd [options]" >&2
-%SPARK_HOME%\bin\spark-submit --help 2>&1 | findstr /V "Usage" 1>&2
-exit /b 0
+call %SPARK_HOME%\bin\spark-submit2.cmd --help 2>&1 | findstr /V "Usage" 1>&2
+goto :eof
diff --git a/bin/spark-sql b/bin/spark-sql
index 3b6cc420fe..ca1729f4cf 100755
--- a/bin/spark-sql
+++ b/bin/spark-sql
@@ -25,12 +25,15 @@ set -o posix
# NOTE: This exact class name is matched downstream by SparkSubmit.
# Any changes need to be reflected there.
-CLASS="org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver"
+export CLASS="org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver"
# Figure out where Spark is installed
-FWDIR="$(cd "`dirname "$0"`"/..; pwd)"
+export FWDIR="$(cd "`dirname "$0"`"/..; pwd)"
function usage {
+ if [ -n "$1" ]; then
+ echo "$1"
+ fi
echo "Usage: ./bin/spark-sql [options] [cli option]"
pattern="usage"
pattern+="\|Spark assembly has been built with Hive"
@@ -42,16 +45,13 @@ function usage {
"$FWDIR"/bin/spark-submit --help 2>&1 | grep -v Usage 1>&2
echo
echo "CLI options:"
- "$FWDIR"/bin/spark-class $CLASS --help 2>&1 | grep -v "$pattern" 1>&2
+ "$FWDIR"/bin/spark-class "$CLASS" --help 2>&1 | grep -v "$pattern" 1>&2
+ exit "$2"
}
+export -f usage
if [[ "$@" = *--help ]] || [[ "$@" = *-h ]]; then
- usage
- exit 0
+ usage "" 0
fi
-source "$FWDIR"/bin/utils.sh
-SUBMIT_USAGE_FUNCTION=usage
-gatherSparkSubmitOpts "$@"
-
-exec "$FWDIR"/bin/spark-submit --class $CLASS "${SUBMISSION_OPTS[@]}" spark-internal "${APPLICATION_OPTS[@]}"
+exec "$FWDIR"/bin/spark-submit --class "$CLASS" "$@"
diff --git a/bin/spark-submit b/bin/spark-submit
index 3e5cbdbb24..bcff78edd5 100755
--- a/bin/spark-submit
+++ b/bin/spark-submit
@@ -17,58 +17,18 @@
# limitations under the License.
#
-# NOTE: Any changes in this file must be reflected in SparkSubmitDriverBootstrapper.scala!
-
-export SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"
-ORIG_ARGS=("$@")
-
-# Set COLUMNS for progress bar
-export COLUMNS=`tput cols`
-
-while (($#)); do
- if [ "$1" = "--deploy-mode" ]; then
- SPARK_SUBMIT_DEPLOY_MODE=$2
- elif [ "$1" = "--properties-file" ]; then
- SPARK_SUBMIT_PROPERTIES_FILE=$2
- elif [ "$1" = "--driver-memory" ]; then
- export SPARK_SUBMIT_DRIVER_MEMORY=$2
- elif [ "$1" = "--driver-library-path" ]; then
- export SPARK_SUBMIT_LIBRARY_PATH=$2
- elif [ "$1" = "--driver-class-path" ]; then
- export SPARK_SUBMIT_CLASSPATH=$2
- elif [ "$1" = "--driver-java-options" ]; then
- export SPARK_SUBMIT_OPTS=$2
- elif [ "$1" = "--master" ]; then
- export MASTER=$2
- fi
- shift
-done
-
-if [ -z "$SPARK_CONF_DIR" ]; then
- export SPARK_CONF_DIR="$SPARK_HOME/conf"
-fi
-DEFAULT_PROPERTIES_FILE="$SPARK_CONF_DIR/spark-defaults.conf"
-if [ "$MASTER" == "yarn-cluster" ]; then
- SPARK_SUBMIT_DEPLOY_MODE=cluster
+SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"
+
+# Only define a usage function if an upstream script hasn't done so.
+if ! type -t usage >/dev/null 2>&1; then
+ usage() {
+ if [ -n "$1" ]; then
+ echo "$1"
+ fi
+ "$SPARK_HOME"/bin/spark-class org.apache.spark.deploy.SparkSubmit --help
+ exit "$2"
+ }
+ export -f usage
fi
-export SPARK_SUBMIT_DEPLOY_MODE=${SPARK_SUBMIT_DEPLOY_MODE:-"client"}
-export SPARK_SUBMIT_PROPERTIES_FILE=${SPARK_SUBMIT_PROPERTIES_FILE:-"$DEFAULT_PROPERTIES_FILE"}
-
-# For client mode, the driver will be launched in the same JVM that launches
-# SparkSubmit, so we may need to read the properties file for any extra class
-# paths, library paths, java options and memory early on. Otherwise, it will
-# be too late by the time the driver JVM has started.
-
-if [[ "$SPARK_SUBMIT_DEPLOY_MODE" == "client" && -f "$SPARK_SUBMIT_PROPERTIES_FILE" ]]; then
- # Parse the properties file only if the special configs exist
- contains_special_configs=$(
- grep -e "spark.driver.extra*\|spark.driver.memory" "$SPARK_SUBMIT_PROPERTIES_FILE" | \
- grep -v "^[[:space:]]*#"
- )
- if [ -n "$contains_special_configs" ]; then
- export SPARK_SUBMIT_BOOTSTRAP_DRIVER=1
- fi
-fi
-
-exec "$SPARK_HOME"/bin/spark-class org.apache.spark.deploy.SparkSubmit "${ORIG_ARGS[@]}"
+exec "$SPARK_HOME"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"
diff --git a/bin/spark-submit2.cmd b/bin/spark-submit2.cmd
index 446cbc74b7..08ddb18574 100644
--- a/bin/spark-submit2.cmd
+++ b/bin/spark-submit2.cmd
@@ -17,62 +17,19 @@ rem See the License for the specific language governing permissions and
rem limitations under the License.
rem
-rem NOTE: Any changes in this file must be reflected in SparkSubmitDriverBootstrapper.scala!
-
-set SPARK_HOME=%~dp0..
-set ORIG_ARGS=%*
-
-rem Reset the values of all variables used
-set SPARK_SUBMIT_DEPLOY_MODE=client
-
-if [%SPARK_CONF_DIR%] == [] (
- set SPARK_CONF_DIR=%SPARK_HOME%\conf
-)
-set SPARK_SUBMIT_PROPERTIES_FILE=%SPARK_CONF_DIR%\spark-defaults.conf
-set SPARK_SUBMIT_DRIVER_MEMORY=
-set SPARK_SUBMIT_LIBRARY_PATH=
-set SPARK_SUBMIT_CLASSPATH=
-set SPARK_SUBMIT_OPTS=
-set SPARK_SUBMIT_BOOTSTRAP_DRIVER=
-
-:loop
-if [%1] == [] goto continue
- if [%1] == [--deploy-mode] (
- set SPARK_SUBMIT_DEPLOY_MODE=%2
- ) else if [%1] == [--properties-file] (
- set SPARK_SUBMIT_PROPERTIES_FILE=%2
- ) else if [%1] == [--driver-memory] (
- set SPARK_SUBMIT_DRIVER_MEMORY=%2
- ) else if [%1] == [--driver-library-path] (
- set SPARK_SUBMIT_LIBRARY_PATH=%2
- ) else if [%1] == [--driver-class-path] (
- set SPARK_SUBMIT_CLASSPATH=%2
- ) else if [%1] == [--driver-java-options] (
- set SPARK_SUBMIT_OPTS=%2
- ) else if [%1] == [--master] (
- set MASTER=%2
- )
- shift
-goto loop
-:continue
-
-if [%MASTER%] == [yarn-cluster] (
- set SPARK_SUBMIT_DEPLOY_MODE=cluster
-)
-
-rem For client mode, the driver will be launched in the same JVM that launches
-rem SparkSubmit, so we may need to read the properties file for any extra class
-rem paths, library paths, java options and memory early on. Otherwise, it will
-rem be too late by the time the driver JVM has started.
-
-if [%SPARK_SUBMIT_DEPLOY_MODE%] == [client] (
- if exist %SPARK_SUBMIT_PROPERTIES_FILE% (
- rem Parse the properties file only if the special configs exist
- for /f %%i in ('findstr /r /c:"^[\t ]*spark.driver.memory" /c:"^[\t ]*spark.driver.extra" ^
- %SPARK_SUBMIT_PROPERTIES_FILE%') do (
- set SPARK_SUBMIT_BOOTSTRAP_DRIVER=1
- )
- )
+rem This is the entry point for running Spark submit. To avoid polluting the
+rem environment, it just launches a new cmd to do the real work.
+
+set CLASS=org.apache.spark.deploy.SparkSubmit
+call %~dp0spark-class2.cmd %CLASS% %*
+set SPARK_ERROR_LEVEL=%ERRORLEVEL%
+if not "x%SPARK_LAUNCHER_USAGE_ERROR%"=="x" (
+ call :usage
+ exit /b 1
)
+exit /b %SPARK_ERROR_LEVEL%
-cmd /V /E /C %SPARK_HOME%\bin\spark-class.cmd org.apache.spark.deploy.SparkSubmit %ORIG_ARGS%
+:usage
+echo %SPARK_LAUNCHER_USAGE_ERROR%
+call %SPARK_HOME%\bin\spark-class2.cmd %CLASS% --help
+goto :eof
diff --git a/bin/utils.sh b/bin/utils.sh
deleted file mode 100755
index 748dbe345a..0000000000
--- a/bin/utils.sh
+++ /dev/null
@@ -1,60 +0,0 @@
-#!/usr/bin/env bash
-
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-# Gather all spark-submit options into SUBMISSION_OPTS
-function gatherSparkSubmitOpts() {
-
- if [ -z "$SUBMIT_USAGE_FUNCTION" ]; then
- echo "Function for printing usage of $0 is not set." 1>&2
- echo "Please set usage function to shell variable 'SUBMIT_USAGE_FUNCTION' in $0" 1>&2
- exit 1
- fi
-
- # NOTE: If you add or remove spark-submit options,
- # modify NOT ONLY this script but also SparkSubmitArgument.scala
- SUBMISSION_OPTS=()
- APPLICATION_OPTS=()
- while (($#)); do
- case "$1" in
- --master | --deploy-mode | --class | --name | --jars | --packages | --py-files | --files | \
- --conf | --repositories | --properties-file | --driver-memory | --driver-java-options | \
- --driver-library-path | --driver-class-path | --executor-memory | --driver-cores | \
- --total-executor-cores | --executor-cores | --queue | --num-executors | --archives | \
- --proxy-user)
- if [[ $# -lt 2 ]]; then
- "$SUBMIT_USAGE_FUNCTION"
- exit 1;
- fi
- SUBMISSION_OPTS+=("$1"); shift
- SUBMISSION_OPTS+=("$1"); shift
- ;;
-
- --verbose | -v | --supervise)
- SUBMISSION_OPTS+=("$1"); shift
- ;;
-
- *)
- APPLICATION_OPTS+=("$1"); shift
- ;;
- esac
- done
-
- export SUBMISSION_OPTS
- export APPLICATION_OPTS
-}
diff --git a/bin/windows-utils.cmd b/bin/windows-utils.cmd
deleted file mode 100644
index 0cf9e87ca5..0000000000
--- a/bin/windows-utils.cmd
+++ /dev/null
@@ -1,60 +0,0 @@
-rem
-rem Licensed to the Apache Software Foundation (ASF) under one or more
-rem contributor license agreements. See the NOTICE file distributed with
-rem this work for additional information regarding copyright ownership.
-rem The ASF licenses this file to You under the Apache License, Version 2.0
-rem (the "License"); you may not use this file except in compliance with
-rem the License. You may obtain a copy of the License at
-rem
-rem http://www.apache.org/licenses/LICENSE-2.0
-rem
-rem Unless required by applicable law or agreed to in writing, software
-rem distributed under the License is distributed on an "AS IS" BASIS,
-rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-rem See the License for the specific language governing permissions and
-rem limitations under the License.
-rem
-
-rem Gather all spark-submit options into SUBMISSION_OPTS
-
-set SUBMISSION_OPTS=
-set APPLICATION_OPTS=
-
-rem NOTE: If you add or remove spark-sumbmit options,
-rem modify NOT ONLY this script but also SparkSubmitArgument.scala
-
-:OptsLoop
-if "x%1"=="x" (
- goto :OptsLoopEnd
-)
-
-SET opts="\<--master\> \<--deploy-mode\> \<--class\> \<--name\> \<--jars\> \<--py-files\> \<--files\>"
-SET opts="%opts:~1,-1% \<--conf\> \<--properties-file\> \<--driver-memory\> \<--driver-java-options\>"
-SET opts="%opts:~1,-1% \<--driver-library-path\> \<--driver-class-path\> \<--executor-memory\>"
-SET opts="%opts:~1,-1% \<--driver-cores\> \<--total-executor-cores\> \<--executor-cores\> \<--queue\>"
-SET opts="%opts:~1,-1% \<--num-executors\> \<--archives\> \<--packages\> \<--repositories\>"
-SET opts="%opts:~1,-1% \<--proxy-user\>"
-
-echo %1 | findstr %opts% >nul
-if %ERRORLEVEL% equ 0 (
- if "x%2"=="x" (
- echo "%1" requires an argument. >&2
- exit /b 1
- )
- set SUBMISSION_OPTS=%SUBMISSION_OPTS% %1 %2
- shift
- shift
- goto :OptsLoop
-)
-echo %1 | findstr "\<--verbose\> \<-v\> \<--supervise\>" >nul
-if %ERRORLEVEL% equ 0 (
- set SUBMISSION_OPTS=%SUBMISSION_OPTS% %1
- shift
- goto :OptsLoop
-)
-set APPLICATION_OPTS=%APPLICATION_OPTS% %1
-shift
-goto :OptsLoop
-
-:OptsLoopEnd
-exit /b 0
diff --git a/core/pom.xml b/core/pom.xml
index dc0d07d806..4164a3a720 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -78,6 +78,11 @@
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
+ <artifactId>spark-launcher_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
<artifactId>spark-network-common_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
index 82e66a3742..94e4bdbfb7 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
@@ -18,18 +18,22 @@
package org.apache.spark.deploy
import java.net.URI
+import java.util.{List => JList}
import java.util.jar.JarFile
+import scala.collection.JavaConversions._
import scala.collection.mutable.{ArrayBuffer, HashMap}
import org.apache.spark.deploy.SparkSubmitAction._
+import org.apache.spark.launcher.SparkSubmitArgumentsParser
import org.apache.spark.util.Utils
/**
* Parses and encapsulates arguments from the spark-submit script.
* The env argument is used for testing.
*/
-private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, String] = sys.env) {
+private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, String] = sys.env)
+ extends SparkSubmitArgumentsParser {
var master: String = null
var deployMode: String = null
var executorMemory: String = null
@@ -84,7 +88,12 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St
}
// Set parameters from command line arguments
- parseOpts(args.toList)
+ try {
+ parse(args.toList)
+ } catch {
+ case e: IllegalArgumentException =>
+ SparkSubmit.printErrorAndExit(e.getMessage())
+ }
// Populate `sparkProperties` map from properties file
mergeDefaultSparkProperties()
// Use `sparkProperties` map along with env vars to fill in any missing parameters
@@ -277,167 +286,139 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St
""".stripMargin
}
- /**
- * Fill in values by parsing user options.
- * NOTE: Any changes here must be reflected in YarnClientSchedulerBackend.
- */
- private def parseOpts(opts: Seq[String]): Unit = {
- val EQ_SEPARATED_OPT="""(--[^=]+)=(.+)""".r
-
- // Delineates parsing of Spark options from parsing of user options.
- parse(opts)
-
- /**
- * NOTE: If you add or remove spark-submit options,
- * modify NOT ONLY this file but also utils.sh
- */
- def parse(opts: Seq[String]): Unit = opts match {
- case ("--name") :: value :: tail =>
+ /** Fill in values by parsing user options. */
+ override protected def handle(opt: String, value: String): Boolean = {
+ opt match {
+ case NAME =>
name = value
- parse(tail)
- case ("--master") :: value :: tail =>
+ case MASTER =>
master = value
- parse(tail)
- case ("--class") :: value :: tail =>
+ case CLASS =>
mainClass = value
- parse(tail)
- case ("--deploy-mode") :: value :: tail =>
+ case DEPLOY_MODE =>
if (value != "client" && value != "cluster") {
SparkSubmit.printErrorAndExit("--deploy-mode must be either \"client\" or \"cluster\"")
}
deployMode = value
- parse(tail)
- case ("--num-executors") :: value :: tail =>
+ case NUM_EXECUTORS =>
numExecutors = value
- parse(tail)
- case ("--total-executor-cores") :: value :: tail =>
+ case TOTAL_EXECUTOR_CORES =>
totalExecutorCores = value
- parse(tail)
- case ("--executor-cores") :: value :: tail =>
+ case EXECUTOR_CORES =>
executorCores = value
- parse(tail)
- case ("--executor-memory") :: value :: tail =>
+ case EXECUTOR_MEMORY =>
executorMemory = value
- parse(tail)
- case ("--driver-memory") :: value :: tail =>
+ case DRIVER_MEMORY =>
driverMemory = value
- parse(tail)
- case ("--driver-cores") :: value :: tail =>
+ case DRIVER_CORES =>
driverCores = value
- parse(tail)
- case ("--driver-class-path") :: value :: tail =>
+ case DRIVER_CLASS_PATH =>
driverExtraClassPath = value
- parse(tail)
- case ("--driver-java-options") :: value :: tail =>
+ case DRIVER_JAVA_OPTIONS =>
driverExtraJavaOptions = value
- parse(tail)
- case ("--driver-library-path") :: value :: tail =>
+ case DRIVER_LIBRARY_PATH =>
driverExtraLibraryPath = value
- parse(tail)
- case ("--properties-file") :: value :: tail =>
+ case PROPERTIES_FILE =>
propertiesFile = value
- parse(tail)
- case ("--kill") :: value :: tail =>
+ case KILL_SUBMISSION =>
submissionToKill = value
if (action != null) {
SparkSubmit.printErrorAndExit(s"Action cannot be both $action and $KILL.")
}
action = KILL
- parse(tail)
- case ("--status") :: value :: tail =>
+ case STATUS =>
submissionToRequestStatusFor = value
if (action != null) {
SparkSubmit.printErrorAndExit(s"Action cannot be both $action and $REQUEST_STATUS.")
}
action = REQUEST_STATUS
- parse(tail)
- case ("--supervise") :: tail =>
+ case SUPERVISE =>
supervise = true
- parse(tail)
- case ("--queue") :: value :: tail =>
+ case QUEUE =>
queue = value
- parse(tail)
- case ("--files") :: value :: tail =>
+ case FILES =>
files = Utils.resolveURIs(value)
- parse(tail)
- case ("--py-files") :: value :: tail =>
+ case PY_FILES =>
pyFiles = Utils.resolveURIs(value)
- parse(tail)
- case ("--archives") :: value :: tail =>
+ case ARCHIVES =>
archives = Utils.resolveURIs(value)
- parse(tail)
- case ("--jars") :: value :: tail =>
+ case JARS =>
jars = Utils.resolveURIs(value)
- parse(tail)
- case ("--packages") :: value :: tail =>
+ case PACKAGES =>
packages = value
- parse(tail)
- case ("--repositories") :: value :: tail =>
+ case REPOSITORIES =>
repositories = value
- parse(tail)
- case ("--conf" | "-c") :: value :: tail =>
+ case CONF =>
value.split("=", 2).toSeq match {
case Seq(k, v) => sparkProperties(k) = v
case _ => SparkSubmit.printErrorAndExit(s"Spark config without '=': $value")
}
- parse(tail)
- case ("--proxy-user") :: value :: tail =>
+ case PROXY_USER =>
proxyUser = value
- parse(tail)
- case ("--help" | "-h") :: tail =>
+ case HELP =>
printUsageAndExit(0)
- case ("--verbose" | "-v") :: tail =>
+ case VERBOSE =>
verbose = true
- parse(tail)
- case ("--version") :: tail =>
+ case VERSION =>
SparkSubmit.printVersionAndExit()
- case EQ_SEPARATED_OPT(opt, value) :: tail =>
- parse(opt :: value :: tail)
+ case _ =>
+ throw new IllegalArgumentException(s"Unexpected argument '$opt'.")
+ }
+ true
+ }
- case value :: tail if value.startsWith("-") =>
- SparkSubmit.printErrorAndExit(s"Unrecognized option '$value'.")
+ /**
+ * Handle unrecognized command line options.
+ *
+ * The first unrecognized option is treated as the "primary resource". Everything else is
+ * treated as application arguments.
+ */
+ override protected def handleUnknown(opt: String): Boolean = {
+ if (opt.startsWith("-")) {
+ SparkSubmit.printErrorAndExit(s"Unrecognized option '$opt'.")
+ }
- case value :: tail =>
- primaryResource =
- if (!SparkSubmit.isShell(value) && !SparkSubmit.isInternal(value)) {
- Utils.resolveURI(value).toString
- } else {
- value
- }
- isPython = SparkSubmit.isPython(value)
- childArgs ++= tail
+ primaryResource =
+ if (!SparkSubmit.isShell(opt) && !SparkSubmit.isInternal(opt)) {
+ Utils.resolveURI(opt).toString
+ } else {
+ opt
+ }
+ isPython = SparkSubmit.isPython(opt)
+ false
+ }
- case Nil =>
- }
+ override protected def handleExtraArgs(extra: JList[String]): Unit = {
+ childArgs ++= extra
}
private def printUsageAndExit(exitCode: Int, unknownParam: Any = null): Unit = {
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala
deleted file mode 100644
index 311048cdaa..0000000000
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala
+++ /dev/null
@@ -1,170 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy
-
-import scala.collection.JavaConversions._
-
-import org.apache.spark.util.{RedirectThread, Utils}
-
-/**
- * Launch an application through Spark submit in client mode with the appropriate classpath,
- * library paths, java options and memory. These properties of the JVM must be set before the
- * driver JVM is launched. The sole purpose of this class is to avoid handling the complexity
- * of parsing the properties file for such relevant configs in Bash.
- *
- * Usage: org.apache.spark.deploy.SparkSubmitDriverBootstrapper <submit args>
- */
-private[spark] object SparkSubmitDriverBootstrapper {
-
- // Note: This class depends on the behavior of `bin/spark-class` and `bin/spark-submit`.
- // Any changes made there must be reflected in this file.
-
- def main(args: Array[String]): Unit = {
-
- // This should be called only from `bin/spark-class`
- if (!sys.env.contains("SPARK_CLASS")) {
- System.err.println("SparkSubmitDriverBootstrapper must be called from `bin/spark-class`!")
- System.exit(1)
- }
-
- val submitArgs = args
- val runner = sys.env("RUNNER")
- val classpath = sys.env("CLASSPATH")
- val javaOpts = sys.env("JAVA_OPTS")
- val defaultDriverMemory = sys.env("OUR_JAVA_MEM")
-
- // Spark submit specific environment variables
- val deployMode = sys.env("SPARK_SUBMIT_DEPLOY_MODE")
- val propertiesFile = sys.env("SPARK_SUBMIT_PROPERTIES_FILE")
- val bootstrapDriver = sys.env("SPARK_SUBMIT_BOOTSTRAP_DRIVER")
- val submitDriverMemory = sys.env.get("SPARK_SUBMIT_DRIVER_MEMORY")
- val submitLibraryPath = sys.env.get("SPARK_SUBMIT_LIBRARY_PATH")
- val submitClasspath = sys.env.get("SPARK_SUBMIT_CLASSPATH")
- val submitJavaOpts = sys.env.get("SPARK_SUBMIT_OPTS")
-
- assume(runner != null, "RUNNER must be set")
- assume(classpath != null, "CLASSPATH must be set")
- assume(javaOpts != null, "JAVA_OPTS must be set")
- assume(defaultDriverMemory != null, "OUR_JAVA_MEM must be set")
- assume(deployMode == "client", "SPARK_SUBMIT_DEPLOY_MODE must be \"client\"!")
- assume(propertiesFile != null, "SPARK_SUBMIT_PROPERTIES_FILE must be set")
- assume(bootstrapDriver != null, "SPARK_SUBMIT_BOOTSTRAP_DRIVER must be set")
-
- // Parse the properties file for the equivalent spark.driver.* configs
- val properties = Utils.getPropertiesFromFile(propertiesFile)
- val confDriverMemory = properties.get("spark.driver.memory")
- val confLibraryPath = properties.get("spark.driver.extraLibraryPath")
- val confClasspath = properties.get("spark.driver.extraClassPath")
- val confJavaOpts = properties.get("spark.driver.extraJavaOptions")
-
- // Favor Spark submit arguments over the equivalent configs in the properties file.
- // Note that we do not actually use the Spark submit values for library path, classpath,
- // and Java opts here, because we have already captured them in Bash.
-
- val newDriverMemory = submitDriverMemory
- .orElse(confDriverMemory)
- .getOrElse(defaultDriverMemory)
-
- val newClasspath =
- if (submitClasspath.isDefined) {
- classpath
- } else {
- classpath + confClasspath.map(sys.props("path.separator") + _).getOrElse("")
- }
-
- val newJavaOpts =
- if (submitJavaOpts.isDefined) {
- // SPARK_SUBMIT_OPTS is already captured in JAVA_OPTS
- javaOpts
- } else {
- javaOpts + confJavaOpts.map(" " + _).getOrElse("")
- }
-
- val filteredJavaOpts = Utils.splitCommandString(newJavaOpts)
- .filterNot(_.startsWith("-Xms"))
- .filterNot(_.startsWith("-Xmx"))
-
- // Build up command
- val command: Seq[String] =
- Seq(runner) ++
- Seq("-cp", newClasspath) ++
- filteredJavaOpts ++
- Seq(s"-Xms$newDriverMemory", s"-Xmx$newDriverMemory") ++
- Seq("org.apache.spark.deploy.SparkSubmit") ++
- submitArgs
-
- // Print the launch command. This follows closely the format used in `bin/spark-class`.
- if (sys.env.contains("SPARK_PRINT_LAUNCH_COMMAND")) {
- System.err.print("Spark Command: ")
- System.err.println(command.mkString(" "))
- System.err.println("========================================\n")
- }
-
- // Start the driver JVM
- val filteredCommand = command.filter(_.nonEmpty)
- val builder = new ProcessBuilder(filteredCommand)
- val env = builder.environment()
-
- if (submitLibraryPath.isEmpty && confLibraryPath.nonEmpty) {
- val libraryPaths = confLibraryPath ++ sys.env.get(Utils.libraryPathEnvName)
- env.put(Utils.libraryPathEnvName, libraryPaths.mkString(sys.props("path.separator")))
- }
-
- val process = builder.start()
-
- // If we kill an app while it's running, its sub-process should be killed too.
- Runtime.getRuntime().addShutdownHook(new Thread() {
- override def run() = {
- if (process != null) {
- process.destroy()
- process.waitFor()
- }
- }
- })
-
- // Redirect stdout and stderr from the child JVM
- val stdoutThread = new RedirectThread(process.getInputStream, System.out, "redirect stdout")
- val stderrThread = new RedirectThread(process.getErrorStream, System.err, "redirect stderr")
- stdoutThread.start()
- stderrThread.start()
-
- // Redirect stdin to child JVM only if we're not running Windows. This is because the
- // subprocess there already reads directly from our stdin, so we should avoid spawning a
- // thread that contends with the subprocess in reading from System.in.
- val isWindows = Utils.isWindows
- val isSubprocess = sys.env.contains("IS_SUBPROCESS")
- if (!isWindows) {
- val stdinThread = new RedirectThread(System.in, process.getOutputStream, "redirect stdin",
- propagateEof = true)
- stdinThread.start()
- // Spark submit (JVM) may run as a subprocess, and so this JVM should terminate on
- // broken pipe, signaling that the parent process has exited. This is the case if the
- // application is launched directly from python, as in the PySpark shell. In Windows,
- // the termination logic is handled in java_gateway.py
- if (isSubprocess) {
- stdinThread.join()
- process.destroy()
- }
- }
- val returnCode = process.waitFor()
- stdoutThread.join()
- stderrThread.join()
- sys.exit(returnCode)
- }
-
-}
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala b/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala
index 3e013c3209..83f78cf473 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala
@@ -20,10 +20,12 @@ package org.apache.spark.deploy.worker
import java.io.{File, FileOutputStream, InputStream, IOException}
import java.lang.System._
+import scala.collection.JavaConversions._
import scala.collection.Map
import org.apache.spark.Logging
import org.apache.spark.deploy.Command
+import org.apache.spark.launcher.WorkerCommandBuilder
import org.apache.spark.util.Utils
/**
@@ -54,12 +56,10 @@ object CommandUtils extends Logging {
}
private def buildCommandSeq(command: Command, memory: Int, sparkHome: String): Seq[String] = {
- val runner = sys.env.get("JAVA_HOME").map(_ + "/bin/java").getOrElse("java")
-
// SPARK-698: do not call the run.cmd script, as process.destroy()
// fails to kill a process tree on Windows
- Seq(runner) ++ buildJavaOpts(command, memory, sparkHome) ++ Seq(command.mainClass) ++
- command.arguments
+ val cmd = new WorkerCommandBuilder(sparkHome, memory, command).buildCommand()
+ cmd.toSeq ++ Seq(command.mainClass) ++ command.arguments
}
/**
@@ -92,44 +92,6 @@ object CommandUtils extends Logging {
command.javaOpts)
}
- /**
- * Attention: this must always be aligned with the environment variables in the run scripts and
- * the way the JAVA_OPTS are assembled there.
- */
- private def buildJavaOpts(command: Command, memory: Int, sparkHome: String): Seq[String] = {
- val memoryOpts = Seq(s"-Xms${memory}M", s"-Xmx${memory}M")
-
- // Exists for backwards compatibility with older Spark versions
- val workerLocalOpts = Option(getenv("SPARK_JAVA_OPTS")).map(Utils.splitCommandString)
- .getOrElse(Nil)
- if (workerLocalOpts.length > 0) {
- logWarning("SPARK_JAVA_OPTS was set on the worker. It is deprecated in Spark 1.0.")
- logWarning("Set SPARK_LOCAL_DIRS for node-specific storage locations.")
- }
-
- // Figure out our classpath with the external compute-classpath script
- val ext = if (System.getProperty("os.name").startsWith("Windows")) ".cmd" else ".sh"
- val classPath = Utils.executeAndGetOutput(
- Seq(sparkHome + "/bin/compute-classpath" + ext),
- extraEnvironment = command.environment)
- val userClassPath = command.classPathEntries ++ Seq(classPath)
-
- val javaVersion = System.getProperty("java.version")
-
- val javaOpts = workerLocalOpts ++ command.javaOpts
-
- val permGenOpt =
- if (!javaVersion.startsWith("1.8") && !javaOpts.exists(_.startsWith("-XX:MaxPermSize="))) {
- // do not specify -XX:MaxPermSize if it was already specified by user
- Some("-XX:MaxPermSize=128m")
- } else {
- None
- }
-
- Seq("-cp", userClassPath.filterNot(_.isEmpty).mkString(File.pathSeparator)) ++
- permGenOpt ++ javaOpts ++ memoryOpts
- }
-
/** Spawn a thread that will redirect a given stream to a file */
def redirectStream(in: InputStream, file: File) {
val out = new FileOutputStream(file, true)
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index bed0a08d4d..a897e53218 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -49,7 +49,6 @@ private[spark] class Executor(
isLocal: Boolean = false)
extends Logging
{
-
logInfo(s"Starting executor ID $executorId on host $executorHostname")
// Application dependencies (added through SparkContext) that we've fetched so far on this node.
diff --git a/core/src/main/scala/org/apache/spark/launcher/SparkSubmitArgumentsParser.scala b/core/src/main/scala/org/apache/spark/launcher/SparkSubmitArgumentsParser.scala
new file mode 100644
index 0000000000..a835012531
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/launcher/SparkSubmitArgumentsParser.scala
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.launcher
+
+/**
+ * This class makes SparkSubmitOptionParser visible for Spark code outside of the `launcher`
+ * package, since Java doesn't have a feature similar to `private[spark]`, and we don't want
+ * that class to be public.
+ */
+private[spark] abstract class SparkSubmitArgumentsParser extends SparkSubmitOptionParser
diff --git a/core/src/main/scala/org/apache/spark/launcher/WorkerCommandBuilder.scala b/core/src/main/scala/org/apache/spark/launcher/WorkerCommandBuilder.scala
new file mode 100644
index 0000000000..9be98723ae
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/launcher/WorkerCommandBuilder.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.launcher
+
+import java.io.File
+import java.util.{HashMap => JHashMap, List => JList, Map => JMap}
+
+import scala.collection.JavaConversions._
+
+import org.apache.spark.deploy.Command
+
+/**
+ * This class is used by CommandUtils. It uses some package-private APIs in SparkLauncher, and since
+ * Java doesn't have a feature similar to `private[spark]`, and we don't want that class to be
+ * public, needs to live in the same package as the rest of the library.
+ */
+private[spark] class WorkerCommandBuilder(sparkHome: String, memoryMb: Int, command: Command)
+ extends AbstractCommandBuilder {
+
+ childEnv.putAll(command.environment)
+ childEnv.put(CommandBuilderUtils.ENV_SPARK_HOME, sparkHome)
+
+ override def buildCommand(env: JMap[String, String]): JList[String] = {
+ val cmd = buildJavaCommand(command.classPathEntries.mkString(File.pathSeparator))
+ cmd.add(s"-Xms${memoryMb}M")
+ cmd.add(s"-Xmx${memoryMb}M")
+ command.javaOpts.foreach(cmd.add)
+ addPermGenSizeOpt(cmd)
+ addOptionString(cmd, getenv("SPARK_JAVA_OPTS"))
+ cmd
+ }
+
+ def buildCommand(): JList[String] = buildCommand(new JHashMap[String, String]())
+
+}
diff --git a/docs/programming-guide.md b/docs/programming-guide.md
index b5e04bd0c6..fa0b4e3705 100644
--- a/docs/programming-guide.md
+++ b/docs/programming-guide.md
@@ -1369,6 +1369,11 @@ The [application submission guide](submitting-applications.html) describes how t
In short, once you package your application into a JAR (for Java/Scala) or a set of `.py` or `.zip` files (for Python),
the `bin/spark-submit` script lets you submit it to any supported cluster manager.
+# Launching Spark jobs from Java / Scala
+
+The [org.apache.spark.launcher](api/java/index.html?org/apache/spark/launcher/package-summary.html)
+package provides classes for launching Spark jobs as child processes using a simple Java API.
+
# Unit Testing
Spark is friendly to unit testing with any popular unit test framework.
diff --git a/launcher/pom.xml b/launcher/pom.xml
new file mode 100644
index 0000000000..ccbd9d0419
--- /dev/null
+++ b/launcher/pom.xml
@@ -0,0 +1,83 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-parent_2.10</artifactId>
+ <version>1.3.0-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-launcher_2.10</artifactId>
+ <packaging>jar</packaging>
+ <name>Spark Launcher Project</name>
+ <url>http://spark.apache.org/</url>
+ <properties>
+ <sbt.project.name>launcher</sbt.project.name>
+ </properties>
+
+ <dependencies>
+ <!-- NOTE: only test-scope dependencies are allowed in this module. -->
+ <dependency>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.scalatest</groupId>
+ <artifactId>scalatest_${scala.binary.version}</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <!-- Not needed by the test code, but referenced by SparkSubmit which is used by the tests. -->
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
+ <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
+ </build>
+</project>
diff --git a/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java b/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java
new file mode 100644
index 0000000000..dc90e9e987
--- /dev/null
+++ b/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java
@@ -0,0 +1,362 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.launcher;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileFilter;
+import java.io.FileInputStream;
+import java.io.InputStreamReader;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.jar.JarFile;
+import java.util.regex.Pattern;
+
+import static org.apache.spark.launcher.CommandBuilderUtils.*;
+
+/**
+ * Abstract Spark command builder that defines common functionality.
+ */
+abstract class AbstractCommandBuilder {
+
+ boolean verbose;
+ String appName;
+ String appResource;
+ String deployMode;
+ String javaHome;
+ String mainClass;
+ String master;
+ String propertiesFile;
+ final List<String> appArgs;
+ final List<String> jars;
+ final List<String> files;
+ final List<String> pyFiles;
+ final Map<String, String> childEnv;
+ final Map<String, String> conf;
+
+ public AbstractCommandBuilder() {
+ this.appArgs = new ArrayList<String>();
+ this.childEnv = new HashMap<String, String>();
+ this.conf = new HashMap<String, String>();
+ this.files = new ArrayList<String>();
+ this.jars = new ArrayList<String>();
+ this.pyFiles = new ArrayList<String>();
+ }
+
+ /**
+ * Builds the command to execute.
+ *
+ * @param env A map containing environment variables for the child process. It may already contain
+ * entries defined by the user (such as SPARK_HOME, or those defined by the
+ * SparkLauncher constructor that takes an environment), and may be modified to
+ * include other variables needed by the process to be executed.
+ */
+ abstract List<String> buildCommand(Map<String, String> env) throws IOException;
+
+ /**
+ * Builds a list of arguments to run java.
+ *
+ * This method finds the java executable to use and appends JVM-specific options for running a
+ * class with Spark in the classpath. It also loads options from the "java-opts" file in the
+ * configuration directory being used.
+ *
+ * Callers should still add at least the class to run, as well as any arguments to pass to the
+ * class.
+ */
+ List<String> buildJavaCommand(String extraClassPath) throws IOException {
+ List<String> cmd = new ArrayList<String>();
+ if (javaHome == null) {
+ cmd.add(join(File.separator, System.getProperty("java.home"), "bin", "java"));
+ } else {
+ cmd.add(join(File.separator, javaHome, "bin", "java"));
+ }
+
+ // Load extra JAVA_OPTS from conf/java-opts, if it exists.
+ File javaOpts = new File(join(File.separator, getConfDir(), "java-opts"));
+ if (javaOpts.isFile()) {
+ BufferedReader br = new BufferedReader(new InputStreamReader(
+ new FileInputStream(javaOpts), "UTF-8"));
+ try {
+ String line;
+ while ((line = br.readLine()) != null) {
+ addOptionString(cmd, line);
+ }
+ } finally {
+ br.close();
+ }
+ }
+
+ cmd.add("-cp");
+ cmd.add(join(File.pathSeparator, buildClassPath(extraClassPath)));
+ return cmd;
+ }
+
+ /**
+ * Adds the default perm gen size option for Spark if the VM requires it and the user hasn't
+ * set it.
+ */
+ void addPermGenSizeOpt(List<String> cmd) {
+ // Don't set MaxPermSize for Java 8 and later.
+ String[] version = System.getProperty("java.version").split("\\.");
+ if (Integer.parseInt(version[0]) > 1 || Integer.parseInt(version[1]) > 7) {
+ return;
+ }
+
+ for (String arg : cmd) {
+ if (arg.startsWith("-XX:MaxPermSize=")) {
+ return;
+ }
+ }
+
+ cmd.add("-XX:MaxPermSize=128m");
+ }
+
+ void addOptionString(List<String> cmd, String options) {
+ if (!isEmpty(options)) {
+ for (String opt : parseOptionString(options)) {
+ cmd.add(opt);
+ }
+ }
+ }
+
+ /**
+ * Builds the classpath for the application. Returns a list with one classpath entry per element;
+ * each entry is formatted in the way expected by <i>java.net.URLClassLoader</i> (more
+ * specifically, with trailing slashes for directories).
+ */
+ List<String> buildClassPath(String appClassPath) throws IOException {
+ String sparkHome = getSparkHome();
+ String scala = getScalaVersion();
+
+ List<String> cp = new ArrayList<String>();
+ addToClassPath(cp, getenv("SPARK_CLASSPATH"));
+ addToClassPath(cp, appClassPath);
+
+ addToClassPath(cp, getConfDir());
+
+ boolean prependClasses = !isEmpty(getenv("SPARK_PREPEND_CLASSES"));
+ boolean isTesting = "1".equals(getenv("SPARK_TESTING"));
+ if (prependClasses || isTesting) {
+ List<String> projects = Arrays.asList("core", "repl", "mllib", "bagel", "graphx",
+ "streaming", "tools", "sql/catalyst", "sql/core", "sql/hive", "sql/hive-thriftserver",
+ "yarn", "launcher");
+ if (prependClasses) {
+ System.err.println(
+ "NOTE: SPARK_PREPEND_CLASSES is set, placing locally compiled Spark classes ahead of " +
+ "assembly.");
+ for (String project : projects) {
+ addToClassPath(cp, String.format("%s/%s/target/scala-%s/classes", sparkHome, project,
+ scala));
+ }
+ }
+ if (isTesting) {
+ for (String project : projects) {
+ addToClassPath(cp, String.format("%s/%s/target/scala-%s/test-classes", sparkHome,
+ project, scala));
+ }
+ }
+
+ // Add this path to include jars that are shaded in the final deliverable created during
+ // the maven build. These jars are copied to this directory during the build.
+ addToClassPath(cp, String.format("%s/core/target/jars/*", sparkHome));
+ }
+
+ String assembly = findAssembly(scala);
+ addToClassPath(cp, assembly);
+
+ // When Hive support is needed, Datanucleus jars must be included on the classpath. Datanucleus
+ // jars do not work if only included in the uber jar as plugin.xml metadata is lost. Both sbt
+ // and maven will populate "lib_managed/jars/" with the datanucleus jars when Spark is built
+ // with Hive, so first check if the datanucleus jars exist, and then ensure the current Spark
+ // assembly is built for Hive, before actually populating the CLASSPATH with the jars.
+ //
+ // This block also serves as a check for SPARK-1703, when the assembly jar is built with
+ // Java 7 and ends up with too many files, causing issues with other JDK versions.
+ boolean needsDataNucleus = false;
+ JarFile assemblyJar = null;
+ try {
+ assemblyJar = new JarFile(assembly);
+ needsDataNucleus = assemblyJar.getEntry("org/apache/hadoop/hive/ql/exec/") != null;
+ } catch (IOException ioe) {
+ if (ioe.getMessage().indexOf("invalid CEN header") >= 0) {
+ System.err.println(
+ "Loading Spark jar failed.\n" +
+ "This is likely because Spark was compiled with Java 7 and run\n" +
+ "with Java 6 (see SPARK-1703). Please use Java 7 to run Spark\n" +
+ "or build Spark with Java 6.");
+ System.exit(1);
+ } else {
+ throw ioe;
+ }
+ } finally {
+ if (assemblyJar != null) {
+ try {
+ assemblyJar.close();
+ } catch (IOException e) {
+ // Ignore.
+ }
+ }
+ }
+
+ if (needsDataNucleus) {
+ System.err.println("Spark assembly has been built with Hive, including Datanucleus jars " +
+ "in classpath.");
+ File libdir;
+ if (new File(sparkHome, "RELEASE").isFile()) {
+ libdir = new File(sparkHome, "lib");
+ } else {
+ libdir = new File(sparkHome, "lib_managed/jars");
+ }
+
+ checkState(libdir.isDirectory(), "Library directory '%s' does not exist.",
+ libdir.getAbsolutePath());
+ for (File jar : libdir.listFiles()) {
+ if (jar.getName().startsWith("datanucleus-")) {
+ addToClassPath(cp, jar.getAbsolutePath());
+ }
+ }
+ }
+
+ addToClassPath(cp, getenv("HADOOP_CONF_DIR"));
+ addToClassPath(cp, getenv("YARN_CONF_DIR"));
+ addToClassPath(cp, getenv("SPARK_DIST_CLASSPATH"));
+ return cp;
+ }
+
+ /**
+ * Adds entries to the classpath.
+ *
+ * @param cp List to which the new entries are appended.
+ * @param entries New classpath entries (separated by File.pathSeparator).
+ */
+ private void addToClassPath(List<String> cp, String entries) {
+ if (isEmpty(entries)) {
+ return;
+ }
+ String[] split = entries.split(Pattern.quote(File.pathSeparator));
+ for (String entry : split) {
+ if (!isEmpty(entry)) {
+ if (new File(entry).isDirectory() && !entry.endsWith(File.separator)) {
+ entry += File.separator;
+ }
+ cp.add(entry);
+ }
+ }
+ }
+
+ String getScalaVersion() {
+ String scala = getenv("SPARK_SCALA_VERSION");
+ if (scala != null) {
+ return scala;
+ }
+
+ String sparkHome = getSparkHome();
+ File scala210 = new File(sparkHome, "assembly/target/scala-2.10");
+ File scala211 = new File(sparkHome, "assembly/target/scala-2.11");
+ checkState(!scala210.isDirectory() || !scala211.isDirectory(),
+ "Presence of build for both scala versions (2.10 and 2.11) detected.\n" +
+ "Either clean one of them or set SPARK_SCALA_VERSION in your environment.");
+ if (scala210.isDirectory()) {
+ return "2.10";
+ } else {
+ checkState(scala211.isDirectory(), "Cannot find any assembly build directories.");
+ return "2.11";
+ }
+ }
+
+ String getSparkHome() {
+ String path = getenv(ENV_SPARK_HOME);
+ checkState(path != null,
+ "Spark home not found; set it explicitly or use the SPARK_HOME environment variable.");
+ return path;
+ }
+
+ /**
+ * Loads the configuration file for the application, if it exists. This is either the
+ * user-specified properties file, or the spark-defaults.conf file under the Spark configuration
+ * directory.
+ */
+ Properties loadPropertiesFile() throws IOException {
+ Properties props = new Properties();
+ File propsFile;
+ if (propertiesFile != null) {
+ propsFile = new File(propertiesFile);
+ checkArgument(propsFile.isFile(), "Invalid properties file '%s'.", propertiesFile);
+ } else {
+ propsFile = new File(getConfDir(), DEFAULT_PROPERTIES_FILE);
+ }
+
+ if (propsFile.isFile()) {
+ FileInputStream fd = null;
+ try {
+ fd = new FileInputStream(propsFile);
+ props.load(new InputStreamReader(fd, "UTF-8"));
+ } finally {
+ if (fd != null) {
+ try {
+ fd.close();
+ } catch (IOException e) {
+ // Ignore.
+ }
+ }
+ }
+ }
+
+ return props;
+ }
+
+ String getenv(String key) {
+ return firstNonEmpty(childEnv.get(key), System.getenv(key));
+ }
+
+ private String findAssembly(String scalaVersion) {
+ String sparkHome = getSparkHome();
+ File libdir;
+ if (new File(sparkHome, "RELEASE").isFile()) {
+ libdir = new File(sparkHome, "lib");
+ checkState(libdir.isDirectory(), "Library directory '%s' does not exist.",
+ libdir.getAbsolutePath());
+ } else {
+ libdir = new File(sparkHome, String.format("assembly/target/scala-%s", scalaVersion));
+ }
+
+ final Pattern re = Pattern.compile("spark-assembly.*hadoop.*\\.jar");
+ FileFilter filter = new FileFilter() {
+ @Override
+ public boolean accept(File file) {
+ return file.isFile() && re.matcher(file.getName()).matches();
+ }
+ };
+ File[] assemblies = libdir.listFiles(filter);
+ checkState(assemblies != null && assemblies.length > 0, "No assemblies found in '%s'.", libdir);
+ checkState(assemblies.length == 1, "Multiple assemblies found in '%s'.", libdir);
+ return assemblies[0].getAbsolutePath();
+ }
+
+ private String getConfDir() {
+ String confDir = getenv("SPARK_CONF_DIR");
+ return confDir != null ? confDir : join(File.separator, getSparkHome(), "conf");
+ }
+
+}
diff --git a/launcher/src/main/java/org/apache/spark/launcher/CommandBuilderUtils.java b/launcher/src/main/java/org/apache/spark/launcher/CommandBuilderUtils.java
new file mode 100644
index 0000000000..9b04732afe
--- /dev/null
+++ b/launcher/src/main/java/org/apache/spark/launcher/CommandBuilderUtils.java
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.launcher;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Helper methods for command builders.
+ */
+class CommandBuilderUtils {
+
+ static final String DEFAULT_MEM = "512m";
+ static final String DEFAULT_PROPERTIES_FILE = "spark-defaults.conf";
+ static final String ENV_SPARK_HOME = "SPARK_HOME";
+
+ /** Returns whether the given string is null or empty. */
+ static boolean isEmpty(String s) {
+ return s == null || s.isEmpty();
+ }
+
+ /** Joins a list of strings using the given separator. */
+ static String join(String sep, String... elements) {
+ StringBuilder sb = new StringBuilder();
+ for (String e : elements) {
+ if (e != null) {
+ if (sb.length() > 0) {
+ sb.append(sep);
+ }
+ sb.append(e);
+ }
+ }
+ return sb.toString();
+ }
+
+ /** Joins a list of strings using the given separator. */
+ static String join(String sep, Iterable<String> elements) {
+ StringBuilder sb = new StringBuilder();
+ for (String e : elements) {
+ if (e != null) {
+ if (sb.length() > 0) {
+ sb.append(sep);
+ }
+ sb.append(e);
+ }
+ }
+ return sb.toString();
+ }
+
+ /**
+ * Returns the first non-empty value mapped to the given key in the given maps, or null otherwise.
+ */
+ static String firstNonEmptyValue(String key, Map<?, ?>... maps) {
+ for (Map<?, ?> map : maps) {
+ String value = (String) map.get(key);
+ if (!isEmpty(value)) {
+ return value;
+ }
+ }
+ return null;
+ }
+
+ /** Returns the first non-empty, non-null string in the given list, or null otherwise. */
+ static String firstNonEmpty(String... candidates) {
+ for (String s : candidates) {
+ if (!isEmpty(s)) {
+ return s;
+ }
+ }
+ return null;
+ }
+
+ /** Returns the name of the env variable that holds the native library path. */
+ static String getLibPathEnvName() {
+ if (isWindows()) {
+ return "PATH";
+ }
+
+ String os = System.getProperty("os.name");
+ if (os.startsWith("Mac OS X")) {
+ return "DYLD_LIBRARY_PATH";
+ } else {
+ return "LD_LIBRARY_PATH";
+ }
+ }
+
+ /** Returns whether the OS is Windows. */
+ static boolean isWindows() {
+ String os = System.getProperty("os.name");
+ return os.startsWith("Windows");
+ }
+
+ /**
+ * Updates the user environment, appending the given pathList to the existing value of the given
+ * environment variable (or setting it if it hasn't yet been set).
+ */
+ static void mergeEnvPathList(Map<String, String> userEnv, String envKey, String pathList) {
+ if (!isEmpty(pathList)) {
+ String current = firstNonEmpty(userEnv.get(envKey), System.getenv(envKey));
+ userEnv.put(envKey, join(File.pathSeparator, current, pathList));
+ }
+ }
+
+ /**
+ * Parse a string as if it were a list of arguments, following bash semantics.
+ * For example:
+ *
+ * Input: "\"ab cd\" efgh 'i \" j'"
+ * Output: [ "ab cd", "efgh", "i \" j" ]
+ */
+ static List<String> parseOptionString(String s) {
+ List<String> opts = new ArrayList<String>();
+ StringBuilder opt = new StringBuilder();
+ boolean inOpt = false;
+ boolean inSingleQuote = false;
+ boolean inDoubleQuote = false;
+ boolean escapeNext = false;
+
+ // This is needed to detect when a quoted empty string is used as an argument ("" or '').
+ boolean hasData = false;
+
+ for (int i = 0; i < s.length(); i++) {
+ int c = s.codePointAt(i);
+ if (escapeNext) {
+ opt.appendCodePoint(c);
+ escapeNext = false;
+ } else if (inOpt) {
+ switch (c) {
+ case '\\':
+ if (inSingleQuote) {
+ opt.appendCodePoint(c);
+ } else {
+ escapeNext = true;
+ }
+ break;
+ case '\'':
+ if (inDoubleQuote) {
+ opt.appendCodePoint(c);
+ } else {
+ inSingleQuote = !inSingleQuote;
+ }
+ break;
+ case '"':
+ if (inSingleQuote) {
+ opt.appendCodePoint(c);
+ } else {
+ inDoubleQuote = !inDoubleQuote;
+ }
+ break;
+ default:
+ if (!Character.isWhitespace(c) || inSingleQuote || inDoubleQuote) {
+ opt.appendCodePoint(c);
+ } else {
+ opts.add(opt.toString());
+ opt.setLength(0);
+ inOpt = false;
+ hasData = false;
+ }
+ }
+ } else {
+ switch (c) {
+ case '\'':
+ inSingleQuote = true;
+ inOpt = true;
+ hasData = true;
+ break;
+ case '"':
+ inDoubleQuote = true;
+ inOpt = true;
+ hasData = true;
+ break;
+ case '\\':
+ escapeNext = true;
+ inOpt = true;
+ hasData = true;
+ break;
+ default:
+ if (!Character.isWhitespace(c)) {
+ inOpt = true;
+ hasData = true;
+ opt.appendCodePoint(c);
+ }
+ }
+ }
+ }
+
+ checkArgument(!inSingleQuote && !inDoubleQuote && !escapeNext, "Invalid option string: %s", s);
+ if (hasData) {
+ opts.add(opt.toString());
+ }
+ return opts;
+ }
+
+ /** Throws IllegalArgumentException if the given object is null. */
+ static void checkNotNull(Object o, String arg) {
+ if (o == null) {
+ throw new IllegalArgumentException(String.format("'%s' must not be null.", arg));
+ }
+ }
+
+ /** Throws IllegalArgumentException with the given message if the check is false. */
+ static void checkArgument(boolean check, String msg, Object... args) {
+ if (!check) {
+ throw new IllegalArgumentException(String.format(msg, args));
+ }
+ }
+
+ /** Throws IllegalStateException with the given message if the check is false. */
+ static void checkState(boolean check, String msg, Object... args) {
+ if (!check) {
+ throw new IllegalStateException(String.format(msg, args));
+ }
+ }
+
+ /**
+ * Quote a command argument for a command to be run by a Windows batch script, if the argument
+ * needs quoting. Arguments only seem to need quotes in batch scripts if they have certain
+ * special characters, some of which need extra (and different) escaping.
+ *
+ * For example:
+ * original single argument: ab="cde fgh"
+ * quoted: "ab^=""cde fgh"""
+ */
+ static String quoteForBatchScript(String arg) {
+
+ boolean needsQuotes = false;
+ for (int i = 0; i < arg.length(); i++) {
+ int c = arg.codePointAt(i);
+ if (Character.isWhitespace(c) || c == '"' || c == '=') {
+ needsQuotes = true;
+ break;
+ }
+ }
+ if (!needsQuotes) {
+ return arg;
+ }
+ StringBuilder quoted = new StringBuilder();
+ quoted.append("\"");
+ for (int i = 0; i < arg.length(); i++) {
+ int cp = arg.codePointAt(i);
+ switch (cp) {
+ case '"':
+ quoted.append('"');
+ break;
+
+ case '=':
+ quoted.append('^');
+ break;
+
+ default:
+ break;
+ }
+ quoted.appendCodePoint(cp);
+ }
+ quoted.append("\"");
+ return quoted.toString();
+ }
+
+ /**
+ * Quotes a string so that it can be used in a command string and be parsed back into a single
+ * argument by python's "shlex.split()" function.
+ *
+ * Basically, just add simple escapes. E.g.:
+ * original single argument : ab "cd" ef
+ * after: "ab \"cd\" ef"
+ */
+ static String quoteForPython(String s) {
+ StringBuilder quoted = new StringBuilder().append('"');
+ for (int i = 0; i < s.length(); i++) {
+ int cp = s.codePointAt(i);
+ if (cp == '"' || cp == '\\') {
+ quoted.appendCodePoint('\\');
+ }
+ quoted.appendCodePoint(cp);
+ }
+ return quoted.append('"').toString();
+ }
+
+}
diff --git a/launcher/src/main/java/org/apache/spark/launcher/Main.java b/launcher/src/main/java/org/apache/spark/launcher/Main.java
new file mode 100644
index 0000000000..206acfb514
--- /dev/null
+++ b/launcher/src/main/java/org/apache/spark/launcher/Main.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.launcher;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.spark.launcher.CommandBuilderUtils.*;
+
+/**
+ * Command line interface for the Spark launcher. Used internally by Spark scripts.
+ */
+class Main {
+
+ /**
+ * Usage: Main [class] [class args]
+ * <p/>
+ * This CLI works in two different modes:
+ * <ul>
+ * <li>"spark-submit": if <i>class</i> is "org.apache.spark.deploy.SparkSubmit", the
+ * {@link SparkLauncher} class is used to launch a Spark application.</li>
+ * <li>"spark-class": if another class is provided, an internal Spark class is run.</li>
+ * </ul>
+ *
+ * This class works in tandem with the "bin/spark-class" script on Unix-like systems, and
+ * "bin/spark-class2.cmd" batch script on Windows to execute the final command.
+ * <p/>
+ * On Unix-like systems, the output is a list of command arguments, separated by the NULL
+ * character. On Windows, the output is a command line suitable for direct execution from the
+ * script.
+ */
+ public static void main(String[] argsArray) throws Exception {
+ checkArgument(argsArray.length > 0, "Not enough arguments: missing class name.");
+
+ List<String> args = new ArrayList<String>(Arrays.asList(argsArray));
+ String className = args.remove(0);
+
+ boolean printLaunchCommand;
+ boolean printUsage;
+ AbstractCommandBuilder builder;
+ try {
+ if (className.equals("org.apache.spark.deploy.SparkSubmit")) {
+ builder = new SparkSubmitCommandBuilder(args);
+ } else {
+ builder = new SparkClassCommandBuilder(className, args);
+ }
+ printLaunchCommand = !isEmpty(System.getenv("SPARK_PRINT_LAUNCH_COMMAND"));
+ printUsage = false;
+ } catch (IllegalArgumentException e) {
+ builder = new UsageCommandBuilder(e.getMessage());
+ printLaunchCommand = false;
+ printUsage = true;
+ }
+
+ Map<String, String> env = new HashMap<String, String>();
+ List<String> cmd = builder.buildCommand(env);
+ if (printLaunchCommand) {
+ System.err.println("Spark Command: " + join(" ", cmd));
+ System.err.println("========================================");
+ }
+
+ if (isWindows()) {
+ // When printing the usage message, we can't use "cmd /v" since that prevents the env
+ // variable from being seen in the caller script. So do not call prepareWindowsCommand().
+ if (printUsage) {
+ System.out.println(join(" ", cmd));
+ } else {
+ System.out.println(prepareWindowsCommand(cmd, env));
+ }
+ } else {
+ // In bash, use NULL as the arg separator since it cannot be used in an argument.
+ List<String> bashCmd = prepareBashCommand(cmd, env);
+ for (String c : bashCmd) {
+ System.out.print(c);
+ System.out.print('\0');
+ }
+ }
+ }
+
+ /**
+ * Prepare a command line for execution from a Windows batch script.
+ *
+ * The method quotes all arguments so that spaces are handled as expected. Quotes within arguments
+ * are "double quoted" (which is batch for escaping a quote). This page has more details about
+ * quoting and other batch script fun stuff: http://ss64.com/nt/syntax-esc.html
+ *
+ * The command is executed using "cmd /c" and formatted in single line, since that's the
+ * easiest way to consume this from a batch script (see spark-class2.cmd).
+ */
+ private static String prepareWindowsCommand(List<String> cmd, Map<String, String> childEnv) {
+ StringBuilder cmdline = new StringBuilder("cmd /c \"");
+ for (Map.Entry<String, String> e : childEnv.entrySet()) {
+ cmdline.append(String.format("set %s=%s", e.getKey(), e.getValue()));
+ cmdline.append(" && ");
+ }
+ for (String arg : cmd) {
+ cmdline.append(quoteForBatchScript(arg));
+ cmdline.append(" ");
+ }
+ cmdline.append("\"");
+ return cmdline.toString();
+ }
+
+ /**
+ * Prepare the command for execution from a bash script. The final command will have commands to
+ * set up any needed environment variables needed by the child process.
+ */
+ private static List<String> prepareBashCommand(List<String> cmd, Map<String, String> childEnv) {
+ if (childEnv.isEmpty()) {
+ return cmd;
+ }
+
+ List<String> newCmd = new ArrayList<String>();
+ newCmd.add("env");
+
+ for (Map.Entry<String, String> e : childEnv.entrySet()) {
+ newCmd.add(String.format("%s=%s", e.getKey(), e.getValue()));
+ }
+ newCmd.addAll(cmd);
+ return newCmd;
+ }
+
+ /**
+ * Internal builder used when command line parsing fails. This will behave differently depending
+ * on the platform:
+ *
+ * - On Unix-like systems, it will print a call to the "usage" function with two arguments: the
+ * the error string, and the exit code to use. The function is expected to print the command's
+ * usage and exit with the provided exit code. The script should use "export -f usage" after
+ * declaring a function called "usage", so that the function is available to downstream scripts.
+ *
+ * - On Windows it will set the variable "SPARK_LAUNCHER_USAGE_ERROR" to the usage error message.
+ * The batch script should check for this variable and print its usage, since batch scripts
+ * don't really support the "export -f" functionality used in bash.
+ */
+ private static class UsageCommandBuilder extends AbstractCommandBuilder {
+
+ private final String message;
+
+ UsageCommandBuilder(String message) {
+ this.message = message;
+ }
+
+ @Override
+ public List<String> buildCommand(Map<String, String> env) {
+ if (isWindows()) {
+ return Arrays.asList("set", "SPARK_LAUNCHER_USAGE_ERROR=" + message);
+ } else {
+ return Arrays.asList("usage", message, "1");
+ }
+ }
+
+ }
+
+}
diff --git a/launcher/src/main/java/org/apache/spark/launcher/SparkClassCommandBuilder.java b/launcher/src/main/java/org/apache/spark/launcher/SparkClassCommandBuilder.java
new file mode 100644
index 0000000000..e601a0a19f
--- /dev/null
+++ b/launcher/src/main/java/org/apache/spark/launcher/SparkClassCommandBuilder.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.launcher;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Pattern;
+
+import static org.apache.spark.launcher.CommandBuilderUtils.*;
+
+/**
+ * Command builder for internal Spark classes.
+ * <p/>
+ * This class handles building the command to launch all internal Spark classes except for
+ * SparkSubmit (which is handled by {@link SparkSubmitCommandBuilder} class.
+ */
+class SparkClassCommandBuilder extends AbstractCommandBuilder {
+
+ private final String className;
+ private final List<String> classArgs;
+
+ SparkClassCommandBuilder(String className, List<String> classArgs) {
+ this.className = className;
+ this.classArgs = classArgs;
+ }
+
+ @Override
+ public List<String> buildCommand(Map<String, String> env) throws IOException {
+ List<String> javaOptsKeys = new ArrayList<String>();
+ String memKey = null;
+ String extraClassPath = null;
+
+ // Master, Worker, and HistoryServer use SPARK_DAEMON_JAVA_OPTS (and specific opts) +
+ // SPARK_DAEMON_MEMORY.
+ if (className.equals("org.apache.spark.deploy.master.Master")) {
+ javaOptsKeys.add("SPARK_DAEMON_JAVA_OPTS");
+ javaOptsKeys.add("SPARK_MASTER_OPTS");
+ memKey = "SPARK_DAEMON_MEMORY";
+ } else if (className.equals("org.apache.spark.deploy.worker.Worker")) {
+ javaOptsKeys.add("SPARK_DAEMON_JAVA_OPTS");
+ javaOptsKeys.add("SPARK_WORKER_OPTS");
+ memKey = "SPARK_DAEMON_MEMORY";
+ } else if (className.equals("org.apache.spark.deploy.history.HistoryServer")) {
+ javaOptsKeys.add("SPARK_DAEMON_JAVA_OPTS");
+ javaOptsKeys.add("SPARK_HISTORY_OPTS");
+ memKey = "SPARK_DAEMON_MEMORY";
+ } else if (className.equals("org.apache.spark.executor.CoarseGrainedExecutorBackend")) {
+ javaOptsKeys.add("SPARK_JAVA_OPTS");
+ javaOptsKeys.add("SPARK_EXECUTOR_OPTS");
+ memKey = "SPARK_EXECUTOR_MEMORY";
+ } else if (className.equals("org.apache.spark.executor.MesosExecutorBackend")) {
+ javaOptsKeys.add("SPARK_EXECUTOR_OPTS");
+ memKey = "SPARK_EXECUTOR_MEMORY";
+ } else if (className.startsWith("org.apache.spark.tools.")) {
+ String sparkHome = getSparkHome();
+ File toolsDir = new File(join(File.separator, sparkHome, "tools", "target",
+ "scala-" + getScalaVersion()));
+ checkState(toolsDir.isDirectory(), "Cannot find tools build directory.");
+
+ Pattern re = Pattern.compile("spark-tools_.*\\.jar");
+ for (File f : toolsDir.listFiles()) {
+ if (re.matcher(f.getName()).matches()) {
+ extraClassPath = f.getAbsolutePath();
+ break;
+ }
+ }
+
+ checkState(extraClassPath != null,
+ "Failed to find Spark Tools Jar in %s.\n" +
+ "You need to run \"build/sbt tools/package\" before running %s.",
+ toolsDir.getAbsolutePath(), className);
+
+ javaOptsKeys.add("SPARK_JAVA_OPTS");
+ }
+
+ List<String> cmd = buildJavaCommand(extraClassPath);
+ for (String key : javaOptsKeys) {
+ addOptionString(cmd, System.getenv(key));
+ }
+
+ String mem = firstNonEmpty(memKey != null ? System.getenv(memKey) : null, DEFAULT_MEM);
+ cmd.add("-Xms" + mem);
+ cmd.add("-Xmx" + mem);
+ addPermGenSizeOpt(cmd);
+ cmd.add(className);
+ cmd.addAll(classArgs);
+ return cmd;
+ }
+
+}
diff --git a/launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java b/launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java
new file mode 100644
index 0000000000..b566507ee6
--- /dev/null
+++ b/launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.launcher;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.spark.launcher.CommandBuilderUtils.*;
+
+/**
+ * Launcher for Spark applications.
+ * <p/>
+ * Use this class to start Spark applications programmatically. The class uses a builder pattern
+ * to allow clients to configure the Spark application and launch it as a child process.
+ */
+public class SparkLauncher {
+
+ /** The Spark master. */
+ public static final String SPARK_MASTER = "spark.master";
+
+ /** Configuration key for the driver memory. */
+ public static final String DRIVER_MEMORY = "spark.driver.memory";
+ /** Configuration key for the driver class path. */
+ public static final String DRIVER_EXTRA_CLASSPATH = "spark.driver.extraClassPath";
+ /** Configuration key for the driver VM options. */
+ public static final String DRIVER_EXTRA_JAVA_OPTIONS = "spark.driver.extraJavaOptions";
+ /** Configuration key for the driver native library path. */
+ public static final String DRIVER_EXTRA_LIBRARY_PATH = "spark.driver.extraLibraryPath";
+
+ /** Configuration key for the executor memory. */
+ public static final String EXECUTOR_MEMORY = "spark.executor.memory";
+ /** Configuration key for the executor class path. */
+ public static final String EXECUTOR_EXTRA_CLASSPATH = "spark.executor.extraClassPath";
+ /** Configuration key for the executor VM options. */
+ public static final String EXECUTOR_EXTRA_JAVA_OPTIONS = "spark.executor.extraJavaOptions";
+ /** Configuration key for the executor native library path. */
+ public static final String EXECUTOR_EXTRA_LIBRARY_PATH = "spark.executor.extraLibraryOptions";
+ /** Configuration key for the number of executor CPU cores. */
+ public static final String EXECUTOR_CORES = "spark.executor.cores";
+
+ private final SparkSubmitCommandBuilder builder;
+
+ public SparkLauncher() {
+ this(null);
+ }
+
+ /**
+ * Creates a launcher that will set the given environment variables in the child.
+ *
+ * @param env Environment variables to set.
+ */
+ public SparkLauncher(Map<String, String> env) {
+ this.builder = new SparkSubmitCommandBuilder();
+ if (env != null) {
+ this.builder.childEnv.putAll(env);
+ }
+ }
+
+ /**
+ * Set a custom JAVA_HOME for launching the Spark application.
+ *
+ * @param javaHome Path to the JAVA_HOME to use.
+ * @return This launcher.
+ */
+ public SparkLauncher setJavaHome(String javaHome) {
+ checkNotNull(javaHome, "javaHome");
+ builder.javaHome = javaHome;
+ return this;
+ }
+
+ /**
+ * Set a custom Spark installation location for the application.
+ *
+ * @param sparkHome Path to the Spark installation to use.
+ * @return This launcher.
+ */
+ public SparkLauncher setSparkHome(String sparkHome) {
+ checkNotNull(sparkHome, "sparkHome");
+ builder.childEnv.put(ENV_SPARK_HOME, sparkHome);
+ return this;
+ }
+
+ /**
+ * Set a custom properties file with Spark configuration for the application.
+ *
+ * @param path Path to custom properties file to use.
+ * @return This launcher.
+ */
+ public SparkLauncher setPropertiesFile(String path) {
+ checkNotNull(path, "path");
+ builder.propertiesFile = path;
+ return this;
+ }
+
+ /**
+ * Set a single configuration value for the application.
+ *
+ * @param key Configuration key.
+ * @param value The value to use.
+ * @return This launcher.
+ */
+ public SparkLauncher setConf(String key, String value) {
+ checkNotNull(key, "key");
+ checkNotNull(value, "value");
+ checkArgument(key.startsWith("spark."), "'key' must start with 'spark.'");
+ builder.conf.put(key, value);
+ return this;
+ }
+
+ /**
+ * Set the application name.
+ *
+ * @param appName Application name.
+ * @return This launcher.
+ */
+ public SparkLauncher setAppName(String appName) {
+ checkNotNull(appName, "appName");
+ builder.appName = appName;
+ return this;
+ }
+
+ /**
+ * Set the Spark master for the application.
+ *
+ * @param master Spark master.
+ * @return This launcher.
+ */
+ public SparkLauncher setMaster(String master) {
+ checkNotNull(master, "master");
+ builder.master = master;
+ return this;
+ }
+
+ /**
+ * Set the deploy mode for the application.
+ *
+ * @param mode Deploy mode.
+ * @return This launcher.
+ */
+ public SparkLauncher setDeployMode(String mode) {
+ checkNotNull(mode, "mode");
+ builder.deployMode = mode;
+ return this;
+ }
+
+ /**
+ * Set the main application resource. This should be the location of a jar file for Scala/Java
+ * applications, or a python script for PySpark applications.
+ *
+ * @param resource Path to the main application resource.
+ * @return This launcher.
+ */
+ public SparkLauncher setAppResource(String resource) {
+ checkNotNull(resource, "resource");
+ builder.appResource = resource;
+ return this;
+ }
+
+ /**
+ * Sets the application class name for Java/Scala applications.
+ *
+ * @param mainClass Application's main class.
+ * @return This launcher.
+ */
+ public SparkLauncher setMainClass(String mainClass) {
+ checkNotNull(mainClass, "mainClass");
+ builder.mainClass = mainClass;
+ return this;
+ }
+
+ /**
+ * Adds command line arguments for the application.
+ *
+ * @param args Arguments to pass to the application's main class.
+ * @return This launcher.
+ */
+ public SparkLauncher addAppArgs(String... args) {
+ for (String arg : args) {
+ checkNotNull(arg, "arg");
+ builder.appArgs.add(arg);
+ }
+ return this;
+ }
+
+ /**
+ * Adds a jar file to be submitted with the application.
+ *
+ * @param jar Path to the jar file.
+ * @return This launcher.
+ */
+ public SparkLauncher addJar(String jar) {
+ checkNotNull(jar, "jar");
+ builder.jars.add(jar);
+ return this;
+ }
+
+ /**
+ * Adds a file to be submitted with the application.
+ *
+ * @param file Path to the file.
+ * @return This launcher.
+ */
+ public SparkLauncher addFile(String file) {
+ checkNotNull(file, "file");
+ builder.files.add(file);
+ return this;
+ }
+
+ /**
+ * Adds a python file / zip / egg to be submitted with the application.
+ *
+ * @param file Path to the file.
+ * @return This launcher.
+ */
+ public SparkLauncher addPyFile(String file) {
+ checkNotNull(file, "file");
+ builder.pyFiles.add(file);
+ return this;
+ }
+
+ /**
+ * Enables verbose reporting for SparkSubmit.
+ *
+ * @param verbose Whether to enable verbose output.
+ * @return This launcher.
+ */
+ public SparkLauncher setVerbose(boolean verbose) {
+ builder.verbose = verbose;
+ return this;
+ }
+
+ /**
+ * Launches a sub-process that will start the configured Spark application.
+ *
+ * @return A process handle for the Spark app.
+ */
+ public Process launch() throws IOException {
+ List<String> cmd = new ArrayList<String>();
+ String script = isWindows() ? "spark-submit.cmd" : "spark-submit";
+ cmd.add(join(File.separator, builder.getSparkHome(), "bin", script));
+ cmd.addAll(builder.buildSparkSubmitArgs());
+
+ // Since the child process is a batch script, let's quote things so that special characters are
+ // preserved, otherwise the batch interpreter will mess up the arguments. Batch scripts are
+ // weird.
+ if (isWindows()) {
+ List<String> winCmd = new ArrayList<String>();
+ for (String arg : cmd) {
+ winCmd.add(quoteForBatchScript(arg));
+ }
+ cmd = winCmd;
+ }
+
+ ProcessBuilder pb = new ProcessBuilder(cmd.toArray(new String[cmd.size()]));
+ for (Map.Entry<String, String> e : builder.childEnv.entrySet()) {
+ pb.environment().put(e.getKey(), e.getValue());
+ }
+ return pb.start();
+ }
+
+}
diff --git a/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java b/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java
new file mode 100644
index 0000000000..6ffdff63d3
--- /dev/null
+++ b/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.launcher;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.apache.spark.launcher.CommandBuilderUtils.*;
+
+/**
+ * Special command builder for handling a CLI invocation of SparkSubmit.
+ * <p/>
+ * This builder adds command line parsing compatible with SparkSubmit. It handles setting
+ * driver-side options and special parsing behavior needed for the special-casing certain internal
+ * Spark applications.
+ * <p/>
+ * This class has also some special features to aid launching pyspark.
+ */
+class SparkSubmitCommandBuilder extends AbstractCommandBuilder {
+
+ /**
+ * Name of the app resource used to identify the PySpark shell. The command line parser expects
+ * the resource name to be the very first argument to spark-submit in this case.
+ *
+ * NOTE: this cannot be "pyspark-shell" since that identifies the PySpark shell to SparkSubmit
+ * (see java_gateway.py), and can cause this code to enter into an infinite loop.
+ */
+ static final String PYSPARK_SHELL = "pyspark-shell-main";
+
+ /**
+ * This is the actual resource name that identifies the PySpark shell to SparkSubmit.
+ */
+ static final String PYSPARK_SHELL_RESOURCE = "pyspark-shell";
+
+ /**
+ * This map must match the class names for available special classes, since this modifies the way
+ * command line parsing works. This maps the class name to the resource to use when calling
+ * spark-submit.
+ */
+ private static final Map<String, String> specialClasses = new HashMap<String, String>();
+ static {
+ specialClasses.put("org.apache.spark.repl.Main", "spark-shell");
+ specialClasses.put("org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver",
+ "spark-internal");
+ specialClasses.put("org.apache.spark.sql.hive.thriftserver.HiveThriftServer2",
+ "spark-internal");
+ }
+
+ private final List<String> sparkArgs;
+
+ /**
+ * Controls whether mixing spark-submit arguments with app arguments is allowed. This is needed
+ * to parse the command lines for things like bin/spark-shell, which allows users to mix and
+ * match arguments (e.g. "bin/spark-shell SparkShellArg --master foo").
+ */
+ private boolean allowsMixedArguments;
+
+ SparkSubmitCommandBuilder() {
+ this.sparkArgs = new ArrayList<String>();
+ }
+
+ SparkSubmitCommandBuilder(List<String> args) {
+ this();
+ List<String> submitArgs = args;
+ if (args.size() > 0 && args.get(0).equals(PYSPARK_SHELL)) {
+ this.allowsMixedArguments = true;
+ appResource = PYSPARK_SHELL_RESOURCE;
+ submitArgs = args.subList(1, args.size());
+ } else {
+ this.allowsMixedArguments = false;
+ }
+
+ new OptionParser().parse(submitArgs);
+ }
+
+ @Override
+ public List<String> buildCommand(Map<String, String> env) throws IOException {
+ if (PYSPARK_SHELL_RESOURCE.equals(appResource)) {
+ return buildPySparkShellCommand(env);
+ } else {
+ return buildSparkSubmitCommand(env);
+ }
+ }
+
+ List<String> buildSparkSubmitArgs() {
+ List<String> args = new ArrayList<String>();
+ SparkSubmitOptionParser parser = new SparkSubmitOptionParser();
+
+ if (verbose) {
+ args.add(parser.VERBOSE);
+ }
+
+ if (master != null) {
+ args.add(parser.MASTER);
+ args.add(master);
+ }
+
+ if (deployMode != null) {
+ args.add(parser.DEPLOY_MODE);
+ args.add(deployMode);
+ }
+
+ if (appName != null) {
+ args.add(parser.NAME);
+ args.add(appName);
+ }
+
+ for (Map.Entry<String, String> e : conf.entrySet()) {
+ args.add(parser.CONF);
+ args.add(String.format("%s=%s", e.getKey(), e.getValue()));
+ }
+
+ if (propertiesFile != null) {
+ args.add(parser.PROPERTIES_FILE);
+ args.add(propertiesFile);
+ }
+
+ if (!jars.isEmpty()) {
+ args.add(parser.JARS);
+ args.add(join(",", jars));
+ }
+
+ if (!files.isEmpty()) {
+ args.add(parser.FILES);
+ args.add(join(",", files));
+ }
+
+ if (!pyFiles.isEmpty()) {
+ args.add(parser.PY_FILES);
+ args.add(join(",", pyFiles));
+ }
+
+ if (mainClass != null) {
+ args.add(parser.CLASS);
+ args.add(mainClass);
+ }
+
+ args.addAll(sparkArgs);
+ if (appResource != null) {
+ args.add(appResource);
+ }
+ args.addAll(appArgs);
+
+ return args;
+ }
+
+ private List<String> buildSparkSubmitCommand(Map<String, String> env) throws IOException {
+ // Load the properties file and check whether spark-submit will be running the app's driver
+ // or just launching a cluster app. When running the driver, the JVM's argument will be
+ // modified to cover the driver's configuration.
+ Properties props = loadPropertiesFile();
+ boolean isClientMode = isClientMode(props);
+ String extraClassPath = isClientMode ?
+ firstNonEmptyValue(SparkLauncher.DRIVER_EXTRA_CLASSPATH, conf, props) : null;
+
+ List<String> cmd = buildJavaCommand(extraClassPath);
+ addOptionString(cmd, System.getenv("SPARK_SUBMIT_OPTS"));
+ addOptionString(cmd, System.getenv("SPARK_JAVA_OPTS"));
+
+ if (isClientMode) {
+ // Figuring out where the memory value come from is a little tricky due to precedence.
+ // Precedence is observed in the following order:
+ // - explicit configuration (setConf()), which also covers --driver-memory cli argument.
+ // - properties file.
+ // - SPARK_DRIVER_MEMORY env variable
+ // - SPARK_MEM env variable
+ // - default value (512m)
+ String memory = firstNonEmpty(firstNonEmptyValue(SparkLauncher.DRIVER_MEMORY, conf, props),
+ System.getenv("SPARK_DRIVER_MEMORY"), System.getenv("SPARK_MEM"), DEFAULT_MEM);
+ cmd.add("-Xms" + memory);
+ cmd.add("-Xmx" + memory);
+ addOptionString(cmd, firstNonEmptyValue(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS, conf, props));
+ mergeEnvPathList(env, getLibPathEnvName(),
+ firstNonEmptyValue(SparkLauncher.DRIVER_EXTRA_LIBRARY_PATH, conf, props));
+ }
+
+ addPermGenSizeOpt(cmd);
+ cmd.add("org.apache.spark.deploy.SparkSubmit");
+ cmd.addAll(buildSparkSubmitArgs());
+ return cmd;
+ }
+
+ private List<String> buildPySparkShellCommand(Map<String, String> env) throws IOException {
+ // For backwards compatibility, if a script is specified in
+ // the pyspark command line, then run it using spark-submit.
+ if (!appArgs.isEmpty() && appArgs.get(0).endsWith(".py")) {
+ System.err.println(
+ "WARNING: Running python applications through 'pyspark' is deprecated as of Spark 1.0.\n" +
+ "Use ./bin/spark-submit <python file>");
+ appResource = appArgs.get(0);
+ appArgs.remove(0);
+ return buildCommand(env);
+ }
+
+ // When launching the pyspark shell, the spark-submit arguments should be stored in the
+ // PYSPARK_SUBMIT_ARGS env variable. The executable is the PYSPARK_DRIVER_PYTHON env variable
+ // set by the pyspark script, followed by PYSPARK_DRIVER_PYTHON_OPTS.
+ checkArgument(appArgs.isEmpty(), "pyspark does not support any application options.");
+
+ Properties props = loadPropertiesFile();
+ mergeEnvPathList(env, getLibPathEnvName(),
+ firstNonEmptyValue(SparkLauncher.DRIVER_EXTRA_LIBRARY_PATH, conf, props));
+
+ // Store spark-submit arguments in an environment variable, since there's no way to pass
+ // them to shell.py on the comand line.
+ StringBuilder submitArgs = new StringBuilder();
+ for (String arg : buildSparkSubmitArgs()) {
+ if (submitArgs.length() > 0) {
+ submitArgs.append(" ");
+ }
+ submitArgs.append(quoteForPython(arg));
+ }
+ env.put("PYSPARK_SUBMIT_ARGS", submitArgs.toString());
+
+ List<String> pyargs = new ArrayList<String>();
+ pyargs.add(firstNonEmpty(System.getenv("PYSPARK_DRIVER_PYTHON"), "python"));
+ String pyOpts = System.getenv("PYSPARK_DRIVER_PYTHON_OPTS");
+ if (!isEmpty(pyOpts)) {
+ pyargs.addAll(parseOptionString(pyOpts));
+ }
+
+ return pyargs;
+ }
+
+ private boolean isClientMode(Properties userProps) {
+ String userMaster = firstNonEmpty(master, (String) userProps.get(SparkLauncher.SPARK_MASTER));
+ // Default master is "local[*]", so assume client mode in that case.
+ return userMaster == null ||
+ "client".equals(deployMode) ||
+ (!userMaster.equals("yarn-cluster") && deployMode == null);
+ }
+
+ private class OptionParser extends SparkSubmitOptionParser {
+
+ private final List<String> driverJvmKeys = Arrays.asList(
+ SparkLauncher.DRIVER_EXTRA_CLASSPATH,
+ SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS,
+ SparkLauncher.DRIVER_EXTRA_LIBRARY_PATH,
+ SparkLauncher.DRIVER_MEMORY);
+
+ @Override
+ protected boolean handle(String opt, String value) {
+ if (opt.equals(MASTER)) {
+ master = value;
+ } else if (opt.equals(DEPLOY_MODE)) {
+ deployMode = value;
+ } else if (opt.equals(PROPERTIES_FILE)) {
+ propertiesFile = value;
+ } else if (opt.equals(DRIVER_MEMORY)) {
+ conf.put(SparkLauncher.DRIVER_MEMORY, value);
+ } else if (opt.equals(DRIVER_JAVA_OPTIONS)) {
+ conf.put(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS, value);
+ } else if (opt.equals(DRIVER_LIBRARY_PATH)) {
+ conf.put(SparkLauncher.DRIVER_EXTRA_LIBRARY_PATH, value);
+ } else if (opt.equals(DRIVER_CLASS_PATH)) {
+ conf.put(SparkLauncher.DRIVER_EXTRA_CLASSPATH, value);
+ } else if (opt.equals(CONF)) {
+ String[] setConf = value.split("=", 2);
+ checkArgument(setConf.length == 2, "Invalid argument to %s: %s", CONF, value);
+ if (driverJvmKeys.contains(setConf[0])) {
+ conf.put(setConf[0], setConf[1]);
+ }
+ } else if (opt.equals(CLASS)) {
+ // The special classes require some special command line handling, since they allow
+ // mixing spark-submit arguments with arguments that should be propagated to the shell
+ // itself. Note that for this to work, the "--class" argument must come before any
+ // non-spark-submit arguments.
+ mainClass = value;
+ if (specialClasses.containsKey(value)) {
+ allowsMixedArguments = true;
+ appResource = specialClasses.get(value);
+ }
+ } else {
+ sparkArgs.add(opt);
+ if (value != null) {
+ sparkArgs.add(value);
+ }
+ }
+ return true;
+ }
+
+ @Override
+ protected boolean handleUnknown(String opt) {
+ // When mixing arguments, add unrecognized parameters directly to the user arguments list. In
+ // normal mode, any unrecognized parameter triggers the end of command line parsing, and the
+ // parameter itself will be interpreted by SparkSubmit as the application resource. The
+ // remaining params will be appended to the list of SparkSubmit arguments.
+ if (allowsMixedArguments) {
+ appArgs.add(opt);
+ return true;
+ } else {
+ sparkArgs.add(opt);
+ return false;
+ }
+ }
+
+ @Override
+ protected void handleExtraArgs(List<String> extra) {
+ for (String arg : extra) {
+ sparkArgs.add(arg);
+ }
+ }
+
+ }
+
+}
diff --git a/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitOptionParser.java b/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitOptionParser.java
new file mode 100644
index 0000000000..8526d2e7cf
--- /dev/null
+++ b/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitOptionParser.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.launcher;
+
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * Parser for spark-submit command line options.
+ * <p/>
+ * This class encapsulates the parsing code for spark-submit command line options, so that there
+ * is a single list of options that needs to be maintained (well, sort of, but it makes it harder
+ * to break things).
+ */
+class SparkSubmitOptionParser {
+
+ // The following constants define the "main" name for the available options. They're defined
+ // to avoid copy & paste of the raw strings where they're needed.
+ //
+ // The fields are not static so that they're exposed to Scala code that uses this class. See
+ // SparkSubmitArguments.scala. That is also why this class is not abstract - to allow code to
+ // easily use these constants without having to create dummy implementations of this class.
+ protected final String CLASS = "--class";
+ protected final String CONF = "--conf";
+ protected final String DEPLOY_MODE = "--deploy-mode";
+ protected final String DRIVER_CLASS_PATH = "--driver-class-path";
+ protected final String DRIVER_CORES = "--driver-cores";
+ protected final String DRIVER_JAVA_OPTIONS = "--driver-java-options";
+ protected final String DRIVER_LIBRARY_PATH = "--driver-library-path";
+ protected final String DRIVER_MEMORY = "--driver-memory";
+ protected final String EXECUTOR_MEMORY = "--executor-memory";
+ protected final String FILES = "--files";
+ protected final String JARS = "--jars";
+ protected final String KILL_SUBMISSION = "--kill";
+ protected final String MASTER = "--master";
+ protected final String NAME = "--name";
+ protected final String PACKAGES = "--packages";
+ protected final String PROPERTIES_FILE = "--properties-file";
+ protected final String PROXY_USER = "--proxy-user";
+ protected final String PY_FILES = "--py-files";
+ protected final String REPOSITORIES = "--repositories";
+ protected final String STATUS = "--status";
+ protected final String TOTAL_EXECUTOR_CORES = "--total-executor-cores";
+
+ // Options that do not take arguments.
+ protected final String HELP = "--help";
+ protected final String SUPERVISE = "--supervise";
+ protected final String VERBOSE = "--verbose";
+ protected final String VERSION = "--version";
+
+ // Standalone-only options.
+
+ // YARN-only options.
+ protected final String ARCHIVES = "--archives";
+ protected final String EXECUTOR_CORES = "--executor-cores";
+ protected final String QUEUE = "--queue";
+ protected final String NUM_EXECUTORS = "--num-executors";
+
+ /**
+ * This is the canonical list of spark-submit options. Each entry in the array contains the
+ * different aliases for the same option; the first element of each entry is the "official"
+ * name of the option, passed to {@link #handle(String, String)}.
+ * <p/>
+ * Options not listed here nor in the "switch" list below will result in a call to
+ * {@link $#handleUnknown(String)}.
+ * <p/>
+ * These two arrays are visible for tests.
+ */
+ final String[][] opts = {
+ { ARCHIVES },
+ { CLASS },
+ { CONF, "-c" },
+ { DEPLOY_MODE },
+ { DRIVER_CLASS_PATH },
+ { DRIVER_CORES },
+ { DRIVER_JAVA_OPTIONS },
+ { DRIVER_LIBRARY_PATH },
+ { DRIVER_MEMORY },
+ { EXECUTOR_CORES },
+ { EXECUTOR_MEMORY },
+ { FILES },
+ { JARS },
+ { KILL_SUBMISSION },
+ { MASTER },
+ { NAME },
+ { NUM_EXECUTORS },
+ { PACKAGES },
+ { PROPERTIES_FILE },
+ { PROXY_USER },
+ { PY_FILES },
+ { QUEUE },
+ { REPOSITORIES },
+ { STATUS },
+ { TOTAL_EXECUTOR_CORES },
+ };
+
+ /**
+ * List of switches (command line options that do not take parameters) recognized by spark-submit.
+ */
+ final String[][] switches = {
+ { HELP, "-h" },
+ { SUPERVISE },
+ { VERBOSE, "-v" },
+ { VERSION },
+ };
+
+ /**
+ * Parse a list of spark-submit command line options.
+ * <p/>
+ * See SparkSubmitArguments.scala for a more formal description of available options.
+ *
+ * @throws IllegalArgumentException If an error is found during parsing.
+ */
+ protected final void parse(List<String> args) {
+ Pattern eqSeparatedOpt = Pattern.compile("(--[^=]+)=(.+)");
+
+ int idx = 0;
+ for (idx = 0; idx < args.size(); idx++) {
+ String arg = args.get(idx);
+ String value = null;
+
+ Matcher m = eqSeparatedOpt.matcher(arg);
+ if (m.matches()) {
+ arg = m.group(1);
+ value = m.group(2);
+ }
+
+ // Look for options with a value.
+ String name = findCliOption(arg, opts);
+ if (name != null) {
+ if (value == null) {
+ if (idx == args.size() - 1) {
+ throw new IllegalArgumentException(
+ String.format("Missing argument for option '%s'.", arg));
+ }
+ idx++;
+ value = args.get(idx);
+ }
+ if (!handle(name, value)) {
+ break;
+ }
+ continue;
+ }
+
+ // Look for a switch.
+ name = findCliOption(arg, switches);
+ if (name != null) {
+ if (!handle(name, null)) {
+ break;
+ }
+ continue;
+ }
+
+ if (!handleUnknown(arg)) {
+ break;
+ }
+ }
+
+ if (idx < args.size()) {
+ idx++;
+ }
+ handleExtraArgs(args.subList(idx, args.size()));
+ }
+
+ /**
+ * Callback for when an option with an argument is parsed.
+ *
+ * @param opt The long name of the cli option (might differ from actual command line).
+ * @param value The value. This will be <i>null</i> if the option does not take a value.
+ * @return Whether to continue parsing the argument list.
+ */
+ protected boolean handle(String opt, String value) {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * Callback for when an unrecognized option is parsed.
+ *
+ * @param opt Unrecognized option from the command line.
+ * @return Whether to continue parsing the argument list.
+ */
+ protected boolean handleUnknown(String opt) {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * Callback for remaining command line arguments after either {@link #handle(String, String)} or
+ * {@link #handleUnknown(String)} return "false". This will be called at the end of parsing even
+ * when there are no remaining arguments.
+ *
+ * @param extra List of remaining arguments.
+ */
+ protected void handleExtraArgs(List<String> extra) {
+ throw new UnsupportedOperationException();
+ }
+
+ private String findCliOption(String name, String[][] available) {
+ for (String[] candidates : available) {
+ for (String candidate : candidates) {
+ if (candidate.equals(name)) {
+ return candidates[0];
+ }
+ }
+ }
+ return null;
+ }
+
+}
diff --git a/launcher/src/main/java/org/apache/spark/launcher/package-info.java b/launcher/src/main/java/org/apache/spark/launcher/package-info.java
new file mode 100644
index 0000000000..7ed756f4b8
--- /dev/null
+++ b/launcher/src/main/java/org/apache/spark/launcher/package-info.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Library for launching Spark applications.
+ * <p/>
+ * This library allows applications to launch Spark programmatically. There's only one entry
+ * point to the library - the {@link org.apache.spark.launcher.SparkLauncher} class.
+ * <p/>
+ * To launch a Spark application, just instantiate a {@link org.apache.spark.launcher.SparkLauncher}
+ * and configure the application to run. For example:
+ *
+ * <pre>
+ * {@code
+ * import org.apache.spark.launcher.SparkLauncher;
+ *
+ * public class MyLauncher {
+ * public static void main(String[] args) throws Exception {
+ * Process spark = new SparkLauncher()
+ * .setAppResource("/my/app.jar")
+ * .setMainClass("my.spark.app.Main")
+ * .setMaster("local")
+ * .setConf(SparkLauncher.DRIVER_MEMORY, "2g")
+ * .launch();
+ * spark.waitFor();
+ * }
+ * }
+ * }
+ * </pre>
+ */
+package org.apache.spark.launcher;
diff --git a/launcher/src/test/java/org/apache/spark/launcher/CommandBuilderUtilsSuite.java b/launcher/src/test/java/org/apache/spark/launcher/CommandBuilderUtilsSuite.java
new file mode 100644
index 0000000000..dba0203867
--- /dev/null
+++ b/launcher/src/test/java/org/apache/spark/launcher/CommandBuilderUtilsSuite.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.launcher;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+import static org.apache.spark.launcher.CommandBuilderUtils.*;
+
+public class CommandBuilderUtilsSuite {
+
+ @Test
+ public void testValidOptionStrings() {
+ testOpt("a b c d e", Arrays.asList("a", "b", "c", "d", "e"));
+ testOpt("a 'b c' \"d\" e", Arrays.asList("a", "b c", "d", "e"));
+ testOpt("a 'b\\\"c' \"'d'\" e", Arrays.asList("a", "b\\\"c", "'d'", "e"));
+ testOpt("a 'b\"c' \"\\\"d\\\"\" e", Arrays.asList("a", "b\"c", "\"d\"", "e"));
+ testOpt(" a b c \\\\ ", Arrays.asList("a", "b", "c", "\\"));
+
+ // Following tests ported from UtilsSuite.scala.
+ testOpt("", new ArrayList<String>());
+ testOpt("a", Arrays.asList("a"));
+ testOpt("aaa", Arrays.asList("aaa"));
+ testOpt("a b c", Arrays.asList("a", "b", "c"));
+ testOpt(" a b\t c ", Arrays.asList("a", "b", "c"));
+ testOpt("a 'b c'", Arrays.asList("a", "b c"));
+ testOpt("a 'b c' d", Arrays.asList("a", "b c", "d"));
+ testOpt("'b c'", Arrays.asList("b c"));
+ testOpt("a \"b c\"", Arrays.asList("a", "b c"));
+ testOpt("a \"b c\" d", Arrays.asList("a", "b c", "d"));
+ testOpt("\"b c\"", Arrays.asList("b c"));
+ testOpt("a 'b\" c' \"d' e\"", Arrays.asList("a", "b\" c", "d' e"));
+ testOpt("a\t'b\nc'\nd", Arrays.asList("a", "b\nc", "d"));
+ testOpt("a \"b\\\\c\"", Arrays.asList("a", "b\\c"));
+ testOpt("a \"b\\\"c\"", Arrays.asList("a", "b\"c"));
+ testOpt("a 'b\\\"c'", Arrays.asList("a", "b\\\"c"));
+ testOpt("'a'b", Arrays.asList("ab"));
+ testOpt("'a''b'", Arrays.asList("ab"));
+ testOpt("\"a\"b", Arrays.asList("ab"));
+ testOpt("\"a\"\"b\"", Arrays.asList("ab"));
+ testOpt("''", Arrays.asList(""));
+ testOpt("\"\"", Arrays.asList(""));
+ }
+
+ @Test
+ public void testInvalidOptionStrings() {
+ testInvalidOpt("\\");
+ testInvalidOpt("\"abcde");
+ testInvalidOpt("'abcde");
+ }
+
+ @Test
+ public void testWindowsBatchQuoting() {
+ assertEquals("abc", quoteForBatchScript("abc"));
+ assertEquals("\"a b c\"", quoteForBatchScript("a b c"));
+ assertEquals("\"a \"\"b\"\" c\"", quoteForBatchScript("a \"b\" c"));
+ assertEquals("\"a\"\"b\"\"c\"", quoteForBatchScript("a\"b\"c"));
+ assertEquals("\"ab^=\"\"cd\"\"\"", quoteForBatchScript("ab=\"cd\""));
+ }
+
+ @Test
+ public void testPythonArgQuoting() {
+ assertEquals("\"abc\"", quoteForPython("abc"));
+ assertEquals("\"a b c\"", quoteForPython("a b c"));
+ assertEquals("\"a \\\"b\\\" c\"", quoteForPython("a \"b\" c"));
+ }
+
+ private void testOpt(String opts, List<String> expected) {
+ assertEquals(String.format("test string failed to parse: [[ %s ]]", opts),
+ expected, parseOptionString(opts));
+ }
+
+ private void testInvalidOpt(String opts) {
+ try {
+ parseOptionString(opts);
+ fail("Expected exception for invalid option string.");
+ } catch (IllegalArgumentException e) {
+ // pass.
+ }
+ }
+
+}
diff --git a/launcher/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java b/launcher/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java
new file mode 100644
index 0000000000..252d5abae1
--- /dev/null
+++ b/launcher/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.launcher;
+
+import java.io.BufferedReader;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import static org.junit.Assert.*;
+
+/**
+ * These tests require the Spark assembly to be built before they can be run.
+ */
+public class SparkLauncherSuite {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SparkLauncherSuite.class);
+
+ @Test
+ public void testChildProcLauncher() throws Exception {
+ Map<String, String> env = new HashMap<String, String>();
+ env.put("SPARK_PRINT_LAUNCH_COMMAND", "1");
+
+ SparkLauncher launcher = new SparkLauncher(env)
+ .setSparkHome(System.getProperty("spark.test.home"))
+ .setMaster("local")
+ .setAppResource("spark-internal")
+ .setConf(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS,
+ "-Dfoo=bar -Dtest.name=-testChildProcLauncher")
+ .setConf(SparkLauncher.DRIVER_EXTRA_CLASSPATH, System.getProperty("java.class.path"))
+ .setMainClass(SparkLauncherTestApp.class.getName())
+ .addAppArgs("proc");
+ final Process app = launcher.launch();
+ new Redirector("stdout", app.getInputStream()).start();
+ new Redirector("stderr", app.getErrorStream()).start();
+ assertEquals(0, app.waitFor());
+ }
+
+ public static class SparkLauncherTestApp {
+
+ public static void main(String[] args) throws Exception {
+ assertEquals(1, args.length);
+ assertEquals("proc", args[0]);
+ assertEquals("bar", System.getProperty("foo"));
+ assertEquals("local", System.getProperty(SparkLauncher.SPARK_MASTER));
+ }
+
+ }
+
+ private static class Redirector extends Thread {
+
+ private final InputStream in;
+
+ Redirector(String name, InputStream in) {
+ this.in = in;
+ setName(name);
+ setDaemon(true);
+ }
+
+ @Override
+ public void run() {
+ try {
+ BufferedReader reader = new BufferedReader(new InputStreamReader(in, "UTF-8"));
+ String line;
+ while ((line = reader.readLine()) != null) {
+ LOG.warn(line);
+ }
+ } catch (Exception e) {
+ LOG.error("Error reading process output.", e);
+ }
+ }
+
+ }
+
+}
diff --git a/launcher/src/test/java/org/apache/spark/launcher/SparkSubmitCommandBuilderSuite.java b/launcher/src/test/java/org/apache/spark/launcher/SparkSubmitCommandBuilderSuite.java
new file mode 100644
index 0000000000..815edc4e49
--- /dev/null
+++ b/launcher/src/test/java/org/apache/spark/launcher/SparkSubmitCommandBuilderSuite.java
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.launcher;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Pattern;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+public class SparkSubmitCommandBuilderSuite {
+
+ private static File dummyPropsFile;
+ private static SparkSubmitOptionParser parser;
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ dummyPropsFile = File.createTempFile("spark", "properties");
+ parser = new SparkSubmitOptionParser();
+ }
+
+ @AfterClass
+ public static void cleanUp() throws Exception {
+ dummyPropsFile.delete();
+ }
+
+ @Test
+ public void testDriverCmdBuilder() throws Exception {
+ testCmdBuilder(true);
+ }
+
+ @Test
+ public void testClusterCmdBuilder() throws Exception {
+ testCmdBuilder(false);
+ }
+
+ @Test
+ public void testCliParser() throws Exception {
+ List<String> sparkSubmitArgs = Arrays.asList(
+ parser.MASTER,
+ "local",
+ parser.DRIVER_MEMORY,
+ "42g",
+ parser.DRIVER_CLASS_PATH,
+ "/driverCp",
+ parser.DRIVER_JAVA_OPTIONS,
+ "extraJavaOpt",
+ parser.CONF,
+ SparkLauncher.DRIVER_EXTRA_LIBRARY_PATH + "=/driverLibPath");
+ Map<String, String> env = new HashMap<String, String>();
+ List<String> cmd = buildCommand(sparkSubmitArgs, env);
+
+ assertTrue(findInStringList(env.get(CommandBuilderUtils.getLibPathEnvName()),
+ File.pathSeparator, "/driverLibPath"));
+ assertTrue(findInStringList(findArgValue(cmd, "-cp"), File.pathSeparator, "/driverCp"));
+ assertTrue("Driver -Xms should be configured.", cmd.contains("-Xms42g"));
+ assertTrue("Driver -Xmx should be configured.", cmd.contains("-Xmx42g"));
+ }
+
+ @Test
+ public void testShellCliParser() throws Exception {
+ List<String> sparkSubmitArgs = Arrays.asList(
+ parser.CLASS,
+ "org.apache.spark.repl.Main",
+ parser.MASTER,
+ "foo",
+ "--app-arg",
+ "bar",
+ "--app-switch",
+ parser.FILES,
+ "baz",
+ parser.NAME,
+ "appName");
+
+ List<String> args = new SparkSubmitCommandBuilder(sparkSubmitArgs).buildSparkSubmitArgs();
+ List<String> expected = Arrays.asList("spark-shell", "--app-arg", "bar", "--app-switch");
+ assertEquals(expected, args.subList(args.size() - expected.size(), args.size()));
+ }
+
+ @Test
+ public void testAlternateSyntaxParsing() throws Exception {
+ List<String> sparkSubmitArgs = Arrays.asList(
+ parser.CLASS + "=org.my.Class",
+ parser.MASTER + "=foo",
+ parser.DEPLOY_MODE + "=bar");
+
+ List<String> cmd = new SparkSubmitCommandBuilder(sparkSubmitArgs).buildSparkSubmitArgs();
+ assertEquals("org.my.Class", findArgValue(cmd, parser.CLASS));
+ assertEquals("foo", findArgValue(cmd, parser.MASTER));
+ assertEquals("bar", findArgValue(cmd, parser.DEPLOY_MODE));
+ }
+
+ @Test
+ public void testPySparkLauncher() throws Exception {
+ List<String> sparkSubmitArgs = Arrays.asList(
+ SparkSubmitCommandBuilder.PYSPARK_SHELL,
+ "--master=foo",
+ "--deploy-mode=bar");
+
+ Map<String, String> env = new HashMap<String, String>();
+ List<String> cmd = buildCommand(sparkSubmitArgs, env);
+ assertEquals("python", cmd.get(cmd.size() - 1));
+ assertEquals(
+ String.format("\"%s\" \"foo\" \"%s\" \"bar\" \"%s\"",
+ parser.MASTER, parser.DEPLOY_MODE, SparkSubmitCommandBuilder.PYSPARK_SHELL_RESOURCE),
+ env.get("PYSPARK_SUBMIT_ARGS"));
+ }
+
+ @Test
+ public void testPySparkFallback() throws Exception {
+ List<String> sparkSubmitArgs = Arrays.asList(
+ "--master=foo",
+ "--deploy-mode=bar",
+ "script.py",
+ "arg1");
+
+ Map<String, String> env = new HashMap<String, String>();
+ List<String> cmd = buildCommand(sparkSubmitArgs, env);
+
+ assertEquals("foo", findArgValue(cmd, "--master"));
+ assertEquals("bar", findArgValue(cmd, "--deploy-mode"));
+ assertEquals("script.py", cmd.get(cmd.size() - 2));
+ assertEquals("arg1", cmd.get(cmd.size() - 1));
+ }
+
+ private void testCmdBuilder(boolean isDriver) throws Exception {
+ String deployMode = isDriver ? "client" : "cluster";
+
+ SparkSubmitCommandBuilder launcher =
+ new SparkSubmitCommandBuilder(Collections.<String>emptyList());
+ launcher.childEnv.put(CommandBuilderUtils.ENV_SPARK_HOME,
+ System.getProperty("spark.test.home"));
+ launcher.master = "yarn";
+ launcher.deployMode = deployMode;
+ launcher.appResource = "/foo";
+ launcher.appName = "MyApp";
+ launcher.mainClass = "my.Class";
+ launcher.propertiesFile = dummyPropsFile.getAbsolutePath();
+ launcher.appArgs.add("foo");
+ launcher.appArgs.add("bar");
+ launcher.conf.put(SparkLauncher.DRIVER_MEMORY, "1g");
+ launcher.conf.put(SparkLauncher.DRIVER_EXTRA_CLASSPATH, "/driver");
+ launcher.conf.put(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS, "-Ddriver -XX:MaxPermSize=256m");
+ launcher.conf.put(SparkLauncher.DRIVER_EXTRA_LIBRARY_PATH, "/native");
+ launcher.conf.put("spark.foo", "foo");
+
+ Map<String, String> env = new HashMap<String, String>();
+ List<String> cmd = launcher.buildCommand(env);
+
+ // Checks below are different for driver and non-driver mode.
+
+ if (isDriver) {
+ assertTrue("Driver -Xms should be configured.", cmd.contains("-Xms1g"));
+ assertTrue("Driver -Xmx should be configured.", cmd.contains("-Xmx1g"));
+ } else {
+ boolean found = false;
+ for (String arg : cmd) {
+ if (arg.startsWith("-Xms") || arg.startsWith("-Xmx")) {
+ found = true;
+ break;
+ }
+ }
+ assertFalse("Memory arguments should not be set.", found);
+ }
+
+ for (String arg : cmd) {
+ if (arg.startsWith("-XX:MaxPermSize=")) {
+ if (isDriver) {
+ assertEquals("-XX:MaxPermSize=256m", arg);
+ } else {
+ assertEquals("-XX:MaxPermSize=128m", arg);
+ }
+ }
+ }
+
+ String[] cp = findArgValue(cmd, "-cp").split(Pattern.quote(File.pathSeparator));
+ if (isDriver) {
+ assertTrue("Driver classpath should contain provided entry.", contains("/driver", cp));
+ } else {
+ assertFalse("Driver classpath should not be in command.", contains("/driver", cp));
+ }
+
+ String libPath = env.get(CommandBuilderUtils.getLibPathEnvName());
+ if (isDriver) {
+ assertNotNull("Native library path should be set.", libPath);
+ assertTrue("Native library path should contain provided entry.",
+ contains("/native", libPath.split(Pattern.quote(File.pathSeparator))));
+ } else {
+ assertNull("Native library should not be set.", libPath);
+ }
+
+ // Checks below are the same for both driver and non-driver mode.
+ assertEquals(dummyPropsFile.getAbsolutePath(), findArgValue(cmd, parser.PROPERTIES_FILE));
+ assertEquals("yarn", findArgValue(cmd, parser.MASTER));
+ assertEquals(deployMode, findArgValue(cmd, parser.DEPLOY_MODE));
+ assertEquals("my.Class", findArgValue(cmd, parser.CLASS));
+ assertEquals("MyApp", findArgValue(cmd, parser.NAME));
+
+ boolean appArgsOk = false;
+ for (int i = 0; i < cmd.size(); i++) {
+ if (cmd.get(i).equals("/foo")) {
+ assertEquals("foo", cmd.get(i + 1));
+ assertEquals("bar", cmd.get(i + 2));
+ assertEquals(cmd.size(), i + 3);
+ appArgsOk = true;
+ break;
+ }
+ }
+ assertTrue("App resource and args should be added to command.", appArgsOk);
+
+ Map<String, String> conf = parseConf(cmd, parser);
+ assertEquals("foo", conf.get("spark.foo"));
+ }
+
+ private boolean contains(String needle, String[] haystack) {
+ for (String entry : haystack) {
+ if (entry.equals(needle)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private Map<String, String> parseConf(List<String> cmd, SparkSubmitOptionParser parser) {
+ Map<String, String> conf = new HashMap<String, String>();
+ for (int i = 0; i < cmd.size(); i++) {
+ if (cmd.get(i).equals(parser.CONF)) {
+ String[] val = cmd.get(i + 1).split("=", 2);
+ conf.put(val[0], val[1]);
+ i += 1;
+ }
+ }
+ return conf;
+ }
+
+ private String findArgValue(List<String> cmd, String name) {
+ for (int i = 0; i < cmd.size(); i++) {
+ if (cmd.get(i).equals(name)) {
+ return cmd.get(i + 1);
+ }
+ }
+ fail(String.format("arg '%s' not found", name));
+ return null;
+ }
+
+ private boolean findInStringList(String list, String sep, String needle) {
+ return contains(needle, list.split(sep));
+ }
+
+ private List<String> buildCommand(List<String> args, Map<String, String> env) throws Exception {
+ SparkSubmitCommandBuilder builder = new SparkSubmitCommandBuilder(args);
+ builder.childEnv.put(CommandBuilderUtils.ENV_SPARK_HOME, System.getProperty("spark.test.home"));
+ return builder.buildCommand(env);
+ }
+
+}
diff --git a/launcher/src/test/java/org/apache/spark/launcher/SparkSubmitOptionParserSuite.java b/launcher/src/test/java/org/apache/spark/launcher/SparkSubmitOptionParserSuite.java
new file mode 100644
index 0000000000..f3d2109917
--- /dev/null
+++ b/launcher/src/test/java/org/apache/spark/launcher/SparkSubmitOptionParserSuite.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.launcher;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import org.junit.Before;
+import org.junit.Test;
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+import static org.apache.spark.launcher.SparkSubmitOptionParser.*;
+
+public class SparkSubmitOptionParserSuite {
+
+ private SparkSubmitOptionParser parser;
+
+ @Before
+ public void setUp() {
+ parser = spy(new DummyParser());
+ }
+
+ @Test
+ public void testAllOptions() {
+ int count = 0;
+ for (String[] optNames : parser.opts) {
+ for (String optName : optNames) {
+ String value = optName + "-value";
+ parser.parse(Arrays.asList(optName, value));
+ count++;
+ verify(parser).handle(eq(optNames[0]), eq(value));
+ verify(parser, times(count)).handle(anyString(), anyString());
+ verify(parser, times(count)).handleExtraArgs(eq(Collections.<String>emptyList()));
+ }
+ }
+
+ for (String[] switchNames : parser.switches) {
+ int switchCount = 0;
+ for (String name : switchNames) {
+ parser.parse(Arrays.asList(name));
+ count++;
+ switchCount++;
+ verify(parser, times(switchCount)).handle(eq(switchNames[0]), same((String) null));
+ verify(parser, times(count)).handle(anyString(), any(String.class));
+ verify(parser, times(count)).handleExtraArgs(eq(Collections.<String>emptyList()));
+ }
+ }
+ }
+
+ @Test
+ public void testExtraOptions() {
+ List<String> args = Arrays.asList(parser.MASTER, parser.MASTER, "foo", "bar");
+ parser.parse(args);
+ verify(parser).handle(eq(parser.MASTER), eq(parser.MASTER));
+ verify(parser).handleUnknown(eq("foo"));
+ verify(parser).handleExtraArgs(eq(Arrays.asList("bar")));
+ }
+
+ @Test(expected=IllegalArgumentException.class)
+ public void testMissingArg() {
+ parser.parse(Arrays.asList(parser.MASTER));
+ }
+
+ @Test
+ public void testEqualSeparatedOption() {
+ List<String> args = Arrays.asList(parser.MASTER + "=" + parser.MASTER);
+ parser.parse(args);
+ verify(parser).handle(eq(parser.MASTER), eq(parser.MASTER));
+ verify(parser).handleExtraArgs(eq(Collections.<String>emptyList()));
+ }
+
+ private static class DummyParser extends SparkSubmitOptionParser {
+
+ @Override
+ protected boolean handle(String opt, String value) {
+ return true;
+ }
+
+ @Override
+ protected boolean handleUnknown(String opt) {
+ return false;
+ }
+
+ @Override
+ protected void handleExtraArgs(List<String> extra) {
+
+ }
+
+ }
+
+}
diff --git a/launcher/src/test/resources/log4j.properties b/launcher/src/test/resources/log4j.properties
new file mode 100644
index 0000000000..00c20ad69c
--- /dev/null
+++ b/launcher/src/test/resources/log4j.properties
@@ -0,0 +1,31 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Set everything to be logged to the file core/target/unit-tests.log
+log4j.rootCategory=INFO, file
+log4j.appender.file=org.apache.log4j.FileAppender
+log4j.appender.file.append=false
+
+# Some tests will set "test.name" to avoid overwriting the main log file.
+log4j.appender.file.file=target/unit-tests${test.name}.log
+
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n
+
+# Ignore messages below warning level from Jetty, because it's a bit verbose
+log4j.logger.org.eclipse.jetty=WARN
+org.eclipse.jetty.LEVEL=WARN
diff --git a/make-distribution.sh b/make-distribution.sh
index dd990d4b96..82d33408cd 100755
--- a/make-distribution.sh
+++ b/make-distribution.sh
@@ -127,6 +127,7 @@ if [ ! $(command -v "$MVN") ] ; then
fi
VERSION=$("$MVN" help:evaluate -Dexpression=project.version 2>/dev/null | grep -v "INFO" | tail -n 1)
+SCALA_VERSION=$("$MVN" help:evaluate -Dexpression=scala.binary.version 2>/dev/null | grep -v "INFO" | tail -n 1)
SPARK_HADOOP_VERSION=$("$MVN" help:evaluate -Dexpression=hadoop.version $@ 2>/dev/null\
| grep -v "INFO"\
| tail -n 1)
@@ -196,6 +197,7 @@ echo "Build flags: $@" >> "$DISTDIR/RELEASE"
# Copy jars
cp "$SPARK_HOME"/assembly/target/scala*/*assembly*hadoop*.jar "$DISTDIR/lib/"
cp "$SPARK_HOME"/examples/target/scala*/spark-examples*.jar "$DISTDIR/lib/"
+cp "$SPARK_HOME"/launcher/target/spark-launcher_$SCALA_VERSION-$VERSION.jar "$DISTDIR/lib/"
# This will fail if the -Pyarn profile is not provided
# In this case, silence the error and ignore the return code of this command
cp "$SPARK_HOME"/network/yarn/target/scala*/spark-*-yarn-shuffle.jar "$DISTDIR/lib/" &> /dev/null || :
diff --git a/pom.xml b/pom.xml
index 51bef30f9c..a19da73cf4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -105,6 +105,7 @@
<module>external/zeromq</module>
<module>examples</module>
<module>repl</module>
+ <module>launcher</module>
</modules>
<properties>
@@ -1195,7 +1196,7 @@
</environmentVariables>
<systemProperties>
<java.awt.headless>true</java.awt.headless>
- <spark.test.home>${session.executionRootDirectory}</spark.test.home>
+ <spark.test.home>${spark.test.home}</spark.test.home>
<spark.testing>1</spark.testing>
<spark.ui.enabled>false</spark.ui.enabled>
<spark.ui.showConsoleProgress>false</spark.ui.showConsoleProgress>
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index 4f17df59f4..35e748f26b 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -34,11 +34,11 @@ object BuildCommons {
val allProjects@Seq(bagel, catalyst, core, graphx, hive, hiveThriftServer, mllib, repl,
sql, networkCommon, networkShuffle, streaming, streamingFlumeSink, streamingFlume, streamingKafka,
- streamingMqtt, streamingTwitter, streamingZeromq) =
+ streamingMqtt, streamingTwitter, streamingZeromq, launcher) =
Seq("bagel", "catalyst", "core", "graphx", "hive", "hive-thriftserver", "mllib", "repl",
"sql", "network-common", "network-shuffle", "streaming", "streaming-flume-sink",
"streaming-flume", "streaming-kafka", "streaming-mqtt", "streaming-twitter",
- "streaming-zeromq").map(ProjectRef(buildLocation, _))
+ "streaming-zeromq", "launcher").map(ProjectRef(buildLocation, _))
val optionallyEnabledProjects@Seq(yarn, yarnStable, java8Tests, sparkGangliaLgpl,
sparkKinesisAsl) = Seq("yarn", "yarn-stable", "java8-tests", "ganglia-lgpl",
@@ -155,8 +155,9 @@ object SparkBuild extends PomBuild {
(allProjects ++ optionallyEnabledProjects).foreach(enable(TestSettings.settings))
// TODO: Add Sql to mima checks
+ // TODO: remove launcher from this list after 1.3.
allProjects.filterNot(x => Seq(spark, sql, hive, hiveThriftServer, catalyst, repl,
- networkCommon, networkShuffle, networkYarn).contains(x)).foreach {
+ networkCommon, networkShuffle, networkYarn, launcher).contains(x)).foreach {
x => enable(MimaBuild.mimaSettings(sparkHome, x))(x)
}
diff --git a/python/pyspark/java_gateway.py b/python/pyspark/java_gateway.py
index 936857e75c..43d2cf5171 100644
--- a/python/pyspark/java_gateway.py
+++ b/python/pyspark/java_gateway.py
@@ -41,7 +41,7 @@ def launch_gateway():
submit_args = os.environ.get("PYSPARK_SUBMIT_ARGS")
submit_args = submit_args if submit_args is not None else ""
submit_args = shlex.split(submit_args)
- command = [os.path.join(SPARK_HOME, script)] + submit_args + ["pyspark-shell"]
+ command = [os.path.join(SPARK_HOME, script)] + submit_args
# Start a socket that will be used by PythonGatewayServer to communicate its port to us
callback_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
@@ -58,7 +58,6 @@ def launch_gateway():
# Don't send ctrl-c / SIGINT to the Java gateway:
def preexec_func():
signal.signal(signal.SIGINT, signal.SIG_IGN)
- env["IS_SUBPROCESS"] = "1" # tell JVM to exit after python exits
proc = Popen(command, stdin=PIPE, preexec_fn=preexec_func, env=env)
else:
# preexec_fn not supported on Windows
diff --git a/sbin/spark-daemon.sh b/sbin/spark-daemon.sh
index 5e812a1d91..92e76a3fe6 100755
--- a/sbin/spark-daemon.sh
+++ b/sbin/spark-daemon.sh
@@ -121,45 +121,63 @@ if [ "$SPARK_NICENESS" = "" ]; then
export SPARK_NICENESS=0
fi
+run_command() {
+ mode="$1"
+ shift
-case $option in
+ mkdir -p "$SPARK_PID_DIR"
- (start|spark-submit)
+ if [ -f "$pid" ]; then
+ TARGET_ID="$(cat "$pid")"
+ if [[ $(ps -p "$TARGET_ID" -o args=) =~ $command ]]; then
+ echo "$command running as process $TARGET_ID. Stop it first."
+ exit 1
+ fi
+ fi
- mkdir -p "$SPARK_PID_DIR"
+ if [ "$SPARK_MASTER" != "" ]; then
+ echo rsync from "$SPARK_MASTER"
+ rsync -a -e ssh --delete --exclude=.svn --exclude='logs/*' --exclude='contrib/hod/logs/*' "$SPARK_MASTER/" "$SPARK_HOME"
+ fi
- if [ -f $pid ]; then
- TARGET_ID="$(cat "$pid")"
- if [[ $(ps -p "$TARGET_ID" -o args=) =~ $command ]]; then
- echo "$command running as process $TARGET_ID. Stop it first."
- exit 1
- fi
- fi
+ spark_rotate_log "$log"
+ echo "starting $command, logging to $log"
+
+ case "$mode" in
+ (class)
+ nohup nice -n "$SPARK_NICENESS" "$SPARK_PREFIX"/bin/spark-class $command "$@" >> "$log" 2>&1 < /dev/null &
+ newpid="$!"
+ ;;
+
+ (submit)
+ nohup nice -n "$SPARK_NICENESS" "$SPARK_PREFIX"/bin/spark-submit --class $command "$@" >> "$log" 2>&1 < /dev/null &
+ newpid="$!"
+ ;;
+
+ (*)
+ echo "unknown mode: $mode"
+ exit 1
+ ;;
+ esac
+
+ echo "$newpid" > "$pid"
+ sleep 2
+ # Check if the process has died; in that case we'll tail the log so the user can see
+ if [[ ! $(ps -p "$newpid" -o args=) =~ $command ]]; then
+ echo "failed to launch $command:"
+ tail -2 "$log" | sed 's/^/ /'
+ echo "full log in $log"
+ fi
+}
- if [ "$SPARK_MASTER" != "" ]; then
- echo rsync from "$SPARK_MASTER"
- rsync -a -e ssh --delete --exclude=.svn --exclude='logs/*' --exclude='contrib/hod/logs/*' $SPARK_MASTER/ "$SPARK_HOME"
- fi
+case $option in
- spark_rotate_log "$log"
- echo "starting $command, logging to $log"
- if [ $option == spark-submit ]; then
- source "$SPARK_HOME"/bin/utils.sh
- gatherSparkSubmitOpts "$@"
- nohup nice -n $SPARK_NICENESS "$SPARK_PREFIX"/bin/spark-submit --class $command \
- "${SUBMISSION_OPTS[@]}" spark-internal "${APPLICATION_OPTS[@]}" >> "$log" 2>&1 < /dev/null &
- else
- nohup nice -n $SPARK_NICENESS "$SPARK_PREFIX"/bin/spark-class $command "$@" >> "$log" 2>&1 < /dev/null &
- fi
- newpid=$!
- echo $newpid > $pid
- sleep 2
- # Check if the process has died; in that case we'll tail the log so the user can see
- if [[ ! $(ps -p "$newpid" -o args=) =~ $command ]]; then
- echo "failed to launch $command:"
- tail -2 "$log" | sed 's/^/ /'
- echo "full log in $log"
- fi
+ (submit)
+ run_command submit "$@"
+ ;;
+
+ (start)
+ run_command class "$@"
;;
(stop)
diff --git a/sbin/start-thriftserver.sh b/sbin/start-thriftserver.sh
index 070cc7a87e..5b0aeb177f 100755
--- a/sbin/start-thriftserver.sh
+++ b/sbin/start-thriftserver.sh
@@ -52,4 +52,4 @@ fi
export SUBMIT_USAGE_FUNCTION=usage
-exec "$FWDIR"/sbin/spark-daemon.sh spark-submit $CLASS 1 "$@"
+exec "$FWDIR"/sbin/spark-daemon.sh submit $CLASS 1 "$@"