aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMarcelo Vanzin <vanzin@cloudera.com>2015-03-11 01:03:01 -0700
committerPatrick Wendell <patrick@databricks.com>2015-03-11 01:03:01 -0700
commit517975d89d40a77c7186f488547eed11f79c1e97 (patch)
tree51bbc6c180bc28ae45a61511d44f5367f357ffd0
parent2d4e00efe2cf179935ae108a68f28edf6e5a1628 (diff)
downloadspark-517975d89d40a77c7186f488547eed11f79c1e97.tar.gz
spark-517975d89d40a77c7186f488547eed11f79c1e97.tar.bz2
spark-517975d89d40a77c7186f488547eed11f79c1e97.zip
[SPARK-4924] Add a library for launching Spark jobs programmatically.
This change encapsulates all the logic involved in launching a Spark job into a small Java library that can be easily embedded into other applications. The overall goal of this change is twofold, as described in the bug: - Provide a public API for launching Spark processes. This is a common request from users and currently there's no good answer for it. - Remove a lot of the duplicated code and other coupling that exists in the different parts of Spark that deal with launching processes. A lot of the duplication was due to different code needed to build an application's classpath (and the bootstrapper needed to run the driver in certain situations), and also different code needed to parse spark-submit command line options in different contexts. The change centralizes those as much as possible so that all code paths can rely on the library for handling those appropriately. Author: Marcelo Vanzin <vanzin@cloudera.com> Closes #3916 from vanzin/SPARK-4924 and squashes the following commits: 18c7e4d [Marcelo Vanzin] Fix make-distribution.sh. 2ce741f [Marcelo Vanzin] Add lots of quotes. 3b28a75 [Marcelo Vanzin] Update new pom. a1b8af1 [Marcelo Vanzin] Merge branch 'master' into SPARK-4924 897141f [Marcelo Vanzin] Review feedback. e2367d2 [Marcelo Vanzin] Merge branch 'master' into SPARK-4924 28cd35e [Marcelo Vanzin] Remove stale comment. b1d86b0 [Marcelo Vanzin] Merge branch 'master' into SPARK-4924 00505f9 [Marcelo Vanzin] Add blurb about new API in the programming guide. 5f4ddcc [Marcelo Vanzin] Better usage messages. 92a9cfb [Marcelo Vanzin] Fix Win32 launcher, usage. 6184c07 [Marcelo Vanzin] Rename field. 4c19196 [Marcelo Vanzin] Update comment. 7e66c18 [Marcelo Vanzin] Fix pyspark tests. 0031a8e [Marcelo Vanzin] Review feedback. c12d84b [Marcelo Vanzin] Review feedback. And fix spark-submit on Windows. e2d4d71 [Marcelo Vanzin] Simplify some code used to launch pyspark. 43008a7 [Marcelo Vanzin] Don't make builder extend SparkLauncher. b4d6912 [Marcelo Vanzin] Use spark-submit script in SparkLauncher. 28b1434 [Marcelo Vanzin] Add a comment. 304333a [Marcelo Vanzin] Fix propagation of properties file arg. bb67b93 [Marcelo Vanzin] Remove unrelated Yarn change (that is also wrong). 8ec0243 [Marcelo Vanzin] Add missing newline. 95ddfa8 [Marcelo Vanzin] Fix handling of --help for spark-class command builder. 72da7ec [Marcelo Vanzin] Rename SparkClassLauncher. 62978e4 [Marcelo Vanzin] Minor cleanup of Windows code path. 9cd5b44 [Marcelo Vanzin] Make all non-public APIs package-private. e4c80b6 [Marcelo Vanzin] Reorganize the code so that only SparkLauncher is public. e50dc5e [Marcelo Vanzin] Merge branch 'master' into SPARK-4924 de81da2 [Marcelo Vanzin] Fix CommandUtils. 86a87bf [Marcelo Vanzin] Merge branch 'master' into SPARK-4924 2061967 [Marcelo Vanzin] Merge branch 'master' into SPARK-4924 46d46da [Marcelo Vanzin] Clean up a test and make it more future-proof. b93692a [Marcelo Vanzin] Merge branch 'master' into SPARK-4924 ad03c48 [Marcelo Vanzin] Revert "Fix a thread-safety issue in "local" mode." 0b509d0 [Marcelo Vanzin] Merge branch 'master' into SPARK-4924 23aa2a9 [Marcelo Vanzin] Read java-opts from conf dir, not spark home. 7cff919 [Marcelo Vanzin] Javadoc updates. eae4d8e [Marcelo Vanzin] Fix new unit tests on Windows. e570fb5 [Marcelo Vanzin] Merge branch 'master' into SPARK-4924 44cd5f7 [Marcelo Vanzin] Add package-info.java, clean up javadocs. f7cacff [Marcelo Vanzin] Remove "launch Spark in new thread" feature. 7ed8859 [Marcelo Vanzin] Some more feedback. 54cd4fd [Marcelo Vanzin] Merge branch 'master' into SPARK-4924 61919df [Marcelo Vanzin] Clean leftover debug statement. aae5897 [Marcelo Vanzin] Use launcher classes instead of jars in non-release mode. e584fc3 [Marcelo Vanzin] Rework command building a little bit. 525ef5b [Marcelo Vanzin] Rework Unix spark-class to handle argument with newlines. 8ac4e92 [Marcelo Vanzin] Minor test cleanup. e946a99 [Marcelo Vanzin] Merge PySparkLauncher into SparkSubmitCliLauncher. c617539 [Marcelo Vanzin] Review feedback round 1. fc6a3e2 [Marcelo Vanzin] Merge branch 'master' into SPARK-4924 f26556b [Marcelo Vanzin] Fix a thread-safety issue in "local" mode. 2f4e8b4 [Marcelo Vanzin] Changes needed to make this work with SPARK-4048. 799fc20 [Marcelo Vanzin] Merge branch 'master' into SPARK-4924 bb5d324 [Marcelo Vanzin] Merge branch 'master' into SPARK-4924 53faef1 [Marcelo Vanzin] Merge branch 'master' into SPARK-4924 a7936ef [Marcelo Vanzin] Fix pyspark tests. 656374e [Marcelo Vanzin] Mima fixes. 4d511e7 [Marcelo Vanzin] Fix tools search code. 7a01e4a [Marcelo Vanzin] Fix pyspark on Yarn. 1b3f6e9 [Marcelo Vanzin] Call SparkSubmit from spark-class launcher for unknown classes. 25c5ae6 [Marcelo Vanzin] Centralize SparkSubmit command line parsing. 27be98a [Marcelo Vanzin] Modify Spark to use launcher lib. 6f70eea [Marcelo Vanzin] [SPARK-4924] Add a library for launching Spark jobs programatically.
-rw-r--r--.gitignore1
-rw-r--r--bin/compute-classpath.cmd124
-rwxr-xr-xbin/compute-classpath.sh161
-rw-r--r--bin/load-spark-env.sh8
-rwxr-xr-xbin/pyspark59
-rw-r--r--bin/pyspark2.cmd57
-rwxr-xr-xbin/run-example2
-rwxr-xr-xbin/spark-class180
-rw-r--r--bin/spark-class2.cmd141
-rwxr-xr-xbin/spark-shell23
-rw-r--r--bin/spark-shell2.cmd27
-rwxr-xr-xbin/spark-sql20
-rwxr-xr-xbin/spark-submit66
-rw-r--r--bin/spark-submit2.cmd71
-rwxr-xr-xbin/utils.sh60
-rw-r--r--bin/windows-utils.cmd60
-rw-r--r--core/pom.xml5
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala157
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala170
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala46
-rw-r--r--core/src/main/scala/org/apache/spark/executor/Executor.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/launcher/SparkSubmitArgumentsParser.scala25
-rw-r--r--core/src/main/scala/org/apache/spark/launcher/WorkerCommandBuilder.scala50
-rw-r--r--docs/programming-guide.md5
-rw-r--r--launcher/pom.xml83
-rw-r--r--launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java362
-rw-r--r--launcher/src/main/java/org/apache/spark/launcher/CommandBuilderUtils.java296
-rw-r--r--launcher/src/main/java/org/apache/spark/launcher/Main.java173
-rw-r--r--launcher/src/main/java/org/apache/spark/launcher/SparkClassCommandBuilder.java108
-rw-r--r--launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java279
-rw-r--r--launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java327
-rw-r--r--launcher/src/main/java/org/apache/spark/launcher/SparkSubmitOptionParser.java224
-rw-r--r--launcher/src/main/java/org/apache/spark/launcher/package-info.java45
-rw-r--r--launcher/src/test/java/org/apache/spark/launcher/CommandBuilderUtilsSuite.java101
-rw-r--r--launcher/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java94
-rw-r--r--launcher/src/test/java/org/apache/spark/launcher/SparkSubmitCommandBuilderSuite.java278
-rw-r--r--launcher/src/test/java/org/apache/spark/launcher/SparkSubmitOptionParserSuite.java108
-rw-r--r--launcher/src/test/resources/log4j.properties31
-rwxr-xr-xmake-distribution.sh2
-rw-r--r--pom.xml3
-rw-r--r--project/SparkBuild.scala7
-rw-r--r--python/pyspark/java_gateway.py3
-rwxr-xr-xsbin/spark-daemon.sh84
-rwxr-xr-xsbin/start-thriftserver.sh2
44 files changed, 2891 insertions, 1238 deletions
diff --git a/.gitignore b/.gitignore
index 9757054a50..d162fa9cca 100644
--- a/.gitignore
+++ b/.gitignore
@@ -6,6 +6,7 @@
*.iml
*.iws
*.pyc
+*.pyo
.idea/
.idea_modules/
build/*.jar
diff --git a/bin/compute-classpath.cmd b/bin/compute-classpath.cmd
deleted file mode 100644
index 088f993954..0000000000
--- a/bin/compute-classpath.cmd
+++ /dev/null
@@ -1,124 +0,0 @@
-@echo off
-
-rem
-rem Licensed to the Apache Software Foundation (ASF) under one or more
-rem contributor license agreements. See the NOTICE file distributed with
-rem this work for additional information regarding copyright ownership.
-rem The ASF licenses this file to You under the Apache License, Version 2.0
-rem (the "License"); you may not use this file except in compliance with
-rem the License. You may obtain a copy of the License at
-rem
-rem http://www.apache.org/licenses/LICENSE-2.0
-rem
-rem Unless required by applicable law or agreed to in writing, software
-rem distributed under the License is distributed on an "AS IS" BASIS,
-rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-rem See the License for the specific language governing permissions and
-rem limitations under the License.
-rem
-
-rem This script computes Spark's classpath and prints it to stdout; it's used by both the "run"
-rem script and the ExecutorRunner in standalone cluster mode.
-
-rem If we're called from spark-class2.cmd, it already set enabledelayedexpansion and setting
-rem it here would stop us from affecting its copy of the CLASSPATH variable; otherwise we
-rem need to set it here because we use !datanucleus_jars! below.
-if "%DONT_PRINT_CLASSPATH%"=="1" goto skip_delayed_expansion
-setlocal enabledelayedexpansion
-:skip_delayed_expansion
-
-set SCALA_VERSION=2.10
-
-rem Figure out where the Spark framework is installed
-set FWDIR=%~dp0..\
-
-rem Load environment variables from conf\spark-env.cmd, if it exists
-if exist "%FWDIR%conf\spark-env.cmd" call "%FWDIR%conf\spark-env.cmd"
-
-rem Build up classpath
-set CLASSPATH=%SPARK_CLASSPATH%;%SPARK_SUBMIT_CLASSPATH%
-
-if not "x%SPARK_CONF_DIR%"=="x" (
- set CLASSPATH=%CLASSPATH%;%SPARK_CONF_DIR%
-) else (
- set CLASSPATH=%CLASSPATH%;%FWDIR%conf
-)
-
-if exist "%FWDIR%RELEASE" (
- for %%d in ("%FWDIR%lib\spark-assembly*.jar") do (
- set ASSEMBLY_JAR=%%d
- )
-) else (
- for %%d in ("%FWDIR%assembly\target\scala-%SCALA_VERSION%\spark-assembly*hadoop*.jar") do (
- set ASSEMBLY_JAR=%%d
- )
-)
-
-set CLASSPATH=%CLASSPATH%;%ASSEMBLY_JAR%
-
-rem When Hive support is needed, Datanucleus jars must be included on the classpath.
-rem Datanucleus jars do not work if only included in the uber jar as plugin.xml metadata is lost.
-rem Both sbt and maven will populate "lib_managed/jars/" with the datanucleus jars when Spark is
-rem built with Hive, so look for them there.
-if exist "%FWDIR%RELEASE" (
- set datanucleus_dir=%FWDIR%lib
-) else (
- set datanucleus_dir=%FWDIR%lib_managed\jars
-)
-set "datanucleus_jars="
-for %%d in ("%datanucleus_dir%\datanucleus-*.jar") do (
- set datanucleus_jars=!datanucleus_jars!;%%d
-)
-set CLASSPATH=%CLASSPATH%;%datanucleus_jars%
-
-set SPARK_CLASSES=%FWDIR%core\target\scala-%SCALA_VERSION%\classes
-set SPARK_CLASSES=%SPARK_CLASSES%;%FWDIR%repl\target\scala-%SCALA_VERSION%\classes
-set SPARK_CLASSES=%SPARK_CLASSES%;%FWDIR%mllib\target\scala-%SCALA_VERSION%\classes
-set SPARK_CLASSES=%SPARK_CLASSES%;%FWDIR%bagel\target\scala-%SCALA_VERSION%\classes
-set SPARK_CLASSES=%SPARK_CLASSES%;%FWDIR%graphx\target\scala-%SCALA_VERSION%\classes
-set SPARK_CLASSES=%SPARK_CLASSES%;%FWDIR%streaming\target\scala-%SCALA_VERSION%\classes
-set SPARK_CLASSES=%SPARK_CLASSES%;%FWDIR%tools\target\scala-%SCALA_VERSION%\classes
-set SPARK_CLASSES=%SPARK_CLASSES%;%FWDIR%sql\catalyst\target\scala-%SCALA_VERSION%\classes
-set SPARK_CLASSES=%SPARK_CLASSES%;%FWDIR%sql\core\target\scala-%SCALA_VERSION%\classes
-set SPARK_CLASSES=%SPARK_CLASSES%;%FWDIR%sql\hive\target\scala-%SCALA_VERSION%\classes
-
-set SPARK_TEST_CLASSES=%FWDIR%core\target\scala-%SCALA_VERSION%\test-classes
-set SPARK_TEST_CLASSES=%SPARK_TEST_CLASSES%;%FWDIR%repl\target\scala-%SCALA_VERSION%\test-classes
-set SPARK_TEST_CLASSES=%SPARK_TEST_CLASSES%;%FWDIR%mllib\target\scala-%SCALA_VERSION%\test-classes
-set SPARK_TEST_CLASSES=%SPARK_TEST_CLASSES%;%FWDIR%bagel\target\scala-%SCALA_VERSION%\test-classes
-set SPARK_TEST_CLASSES=%SPARK_TEST_CLASSES%;%FWDIR%graphx\target\scala-%SCALA_VERSION%\test-classes
-set SPARK_TEST_CLASSES=%SPARK_TEST_CLASSES%;%FWDIR%streaming\target\scala-%SCALA_VERSION%\test-classes
-set SPARK_TEST_CLASSES=%SPARK_TEST_CLASSES%;%FWDIR%sql\catalyst\target\scala-%SCALA_VERSION%\test-classes
-set SPARK_TEST_CLASSES=%SPARK_TEST_CLASSES%;%FWDIR%sql\core\target\scala-%SCALA_VERSION%\test-classes
-set SPARK_TEST_CLASSES=%SPARK_TEST_CLASSES%;%FWDIR%sql\hive\target\scala-%SCALA_VERSION%\test-classes
-
-if "x%SPARK_TESTING%"=="x1" (
- rem Add test clases to path - note, add SPARK_CLASSES and SPARK_TEST_CLASSES before CLASSPATH
- rem so that local compilation takes precedence over assembled jar
- set CLASSPATH=%SPARK_CLASSES%;%SPARK_TEST_CLASSES%;%CLASSPATH%
-)
-
-rem Add hadoop conf dir - else FileSystem.*, etc fail
-rem Note, this assumes that there is either a HADOOP_CONF_DIR or YARN_CONF_DIR which hosts
-rem the configurtion files.
-if "x%HADOOP_CONF_DIR%"=="x" goto no_hadoop_conf_dir
- set CLASSPATH=%CLASSPATH%;%HADOOP_CONF_DIR%
-:no_hadoop_conf_dir
-
-if "x%YARN_CONF_DIR%"=="x" goto no_yarn_conf_dir
- set CLASSPATH=%CLASSPATH%;%YARN_CONF_DIR%
-:no_yarn_conf_dir
-
-rem To allow for distributions to append needed libraries to the classpath (e.g. when
-rem using the "hadoop-provided" profile to build Spark), check SPARK_DIST_CLASSPATH and
-rem append it to tbe final classpath.
-if not "x%$SPARK_DIST_CLASSPATH%"=="x" (
- set CLASSPATH=%CLASSPATH%;%SPARK_DIST_CLASSPATH%
-)
-
-rem A bit of a hack to allow calling this script within run2.cmd without seeing output
-if "%DONT_PRINT_CLASSPATH%"=="1" goto exit
-
-echo %CLASSPATH%
-
-:exit
diff --git a/bin/compute-classpath.sh b/bin/compute-classpath.sh
deleted file mode 100755
index f4f6b7b909..0000000000
--- a/bin/compute-classpath.sh
+++ /dev/null
@@ -1,161 +0,0 @@
-#!/usr/bin/env bash
-
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-# This script computes Spark's classpath and prints it to stdout; it's used by both the "run"
-# script and the ExecutorRunner in standalone cluster mode.
-
-# Figure out where Spark is installed
-FWDIR="$(cd "`dirname "$0"`"/..; pwd)"
-
-. "$FWDIR"/bin/load-spark-env.sh
-
-if [ -n "$SPARK_CLASSPATH" ]; then
- CLASSPATH="$SPARK_CLASSPATH:$SPARK_SUBMIT_CLASSPATH"
-else
- CLASSPATH="$SPARK_SUBMIT_CLASSPATH"
-fi
-
-# Build up classpath
-if [ -n "$SPARK_CONF_DIR" ]; then
- CLASSPATH="$CLASSPATH:$SPARK_CONF_DIR"
-else
- CLASSPATH="$CLASSPATH:$FWDIR/conf"
-fi
-
-ASSEMBLY_DIR="$FWDIR/assembly/target/scala-$SPARK_SCALA_VERSION"
-
-if [ -n "$JAVA_HOME" ]; then
- JAR_CMD="$JAVA_HOME/bin/jar"
-else
- JAR_CMD="jar"
-fi
-
-# A developer option to prepend more recently compiled Spark classes
-if [ -n "$SPARK_PREPEND_CLASSES" ]; then
- echo "NOTE: SPARK_PREPEND_CLASSES is set, placing locally compiled Spark"\
- "classes ahead of assembly." >&2
- # Spark classes
- CLASSPATH="$CLASSPATH:$FWDIR/core/target/scala-$SPARK_SCALA_VERSION/classes"
- CLASSPATH="$CLASSPATH:$FWDIR/repl/target/scala-$SPARK_SCALA_VERSION/classes"
- CLASSPATH="$CLASSPATH:$FWDIR/mllib/target/scala-$SPARK_SCALA_VERSION/classes"
- CLASSPATH="$CLASSPATH:$FWDIR/bagel/target/scala-$SPARK_SCALA_VERSION/classes"
- CLASSPATH="$CLASSPATH:$FWDIR/graphx/target/scala-$SPARK_SCALA_VERSION/classes"
- CLASSPATH="$CLASSPATH:$FWDIR/streaming/target/scala-$SPARK_SCALA_VERSION/classes"
- CLASSPATH="$CLASSPATH:$FWDIR/tools/target/scala-$SPARK_SCALA_VERSION/classes"
- CLASSPATH="$CLASSPATH:$FWDIR/sql/catalyst/target/scala-$SPARK_SCALA_VERSION/classes"
- CLASSPATH="$CLASSPATH:$FWDIR/sql/core/target/scala-$SPARK_SCALA_VERSION/classes"
- CLASSPATH="$CLASSPATH:$FWDIR/sql/hive/target/scala-$SPARK_SCALA_VERSION/classes"
- CLASSPATH="$CLASSPATH:$FWDIR/sql/hive-thriftserver/target/scala-$SPARK_SCALA_VERSION/classes"
- CLASSPATH="$CLASSPATH:$FWDIR/yarn/stable/target/scala-$SPARK_SCALA_VERSION/classes"
- # Jars for shaded deps in their original form (copied here during build)
- CLASSPATH="$CLASSPATH:$FWDIR/core/target/jars/*"
-fi
-
-# Use spark-assembly jar from either RELEASE or assembly directory
-if [ -f "$FWDIR/RELEASE" ]; then
- assembly_folder="$FWDIR"/lib
-else
- assembly_folder="$ASSEMBLY_DIR"
-fi
-
-num_jars=0
-
-for f in "${assembly_folder}"/spark-assembly*hadoop*.jar; do
- if [[ ! -e "$f" ]]; then
- echo "Failed to find Spark assembly in $assembly_folder" 1>&2
- echo "You need to build Spark before running this program." 1>&2
- exit 1
- fi
- ASSEMBLY_JAR="$f"
- num_jars=$((num_jars+1))
-done
-
-if [ "$num_jars" -gt "1" ]; then
- echo "Found multiple Spark assembly jars in $assembly_folder:" 1>&2
- ls "${assembly_folder}"/spark-assembly*hadoop*.jar 1>&2
- echo "Please remove all but one jar." 1>&2
- exit 1
-fi
-
-# Verify that versions of java used to build the jars and run Spark are compatible
-jar_error_check=$("$JAR_CMD" -tf "$ASSEMBLY_JAR" nonexistent/class/path 2>&1)
-if [[ "$jar_error_check" =~ "invalid CEN header" ]]; then
- echo "Loading Spark jar with '$JAR_CMD' failed. " 1>&2
- echo "This is likely because Spark was compiled with Java 7 and run " 1>&2
- echo "with Java 6. (see SPARK-1703). Please use Java 7 to run Spark " 1>&2
- echo "or build Spark with Java 6." 1>&2
- exit 1
-fi
-
-CLASSPATH="$CLASSPATH:$ASSEMBLY_JAR"
-
-# When Hive support is needed, Datanucleus jars must be included on the classpath.
-# Datanucleus jars do not work if only included in the uber jar as plugin.xml metadata is lost.
-# Both sbt and maven will populate "lib_managed/jars/" with the datanucleus jars when Spark is
-# built with Hive, so first check if the datanucleus jars exist, and then ensure the current Spark
-# assembly is built for Hive, before actually populating the CLASSPATH with the jars.
-# Note that this check order is faster (by up to half a second) in the case where Hive is not used.
-if [ -f "$FWDIR/RELEASE" ]; then
- datanucleus_dir="$FWDIR"/lib
-else
- datanucleus_dir="$FWDIR"/lib_managed/jars
-fi
-
-datanucleus_jars="$(find "$datanucleus_dir" 2>/dev/null | grep "datanucleus-.*\\.jar$")"
-datanucleus_jars="$(echo "$datanucleus_jars" | tr "\n" : | sed s/:$//g)"
-
-if [ -n "$datanucleus_jars" ]; then
- hive_files=$("$JAR_CMD" -tf "$ASSEMBLY_JAR" org/apache/hadoop/hive/ql/exec 2>/dev/null)
- if [ -n "$hive_files" ]; then
- echo "Spark assembly has been built with Hive, including Datanucleus jars on classpath" 1>&2
- CLASSPATH="$CLASSPATH:$datanucleus_jars"
- fi
-fi
-
-# Add test classes if we're running from SBT or Maven with SPARK_TESTING set to 1
-if [[ $SPARK_TESTING == 1 ]]; then
- CLASSPATH="$CLASSPATH:$FWDIR/core/target/scala-$SPARK_SCALA_VERSION/test-classes"
- CLASSPATH="$CLASSPATH:$FWDIR/repl/target/scala-$SPARK_SCALA_VERSION/test-classes"
- CLASSPATH="$CLASSPATH:$FWDIR/mllib/target/scala-$SPARK_SCALA_VERSION/test-classes"
- CLASSPATH="$CLASSPATH:$FWDIR/bagel/target/scala-$SPARK_SCALA_VERSION/test-classes"
- CLASSPATH="$CLASSPATH:$FWDIR/graphx/target/scala-$SPARK_SCALA_VERSION/test-classes"
- CLASSPATH="$CLASSPATH:$FWDIR/streaming/target/scala-$SPARK_SCALA_VERSION/test-classes"
- CLASSPATH="$CLASSPATH:$FWDIR/sql/catalyst/target/scala-$SPARK_SCALA_VERSION/test-classes"
- CLASSPATH="$CLASSPATH:$FWDIR/sql/core/target/scala-$SPARK_SCALA_VERSION/test-classes"
- CLASSPATH="$CLASSPATH:$FWDIR/sql/hive/target/scala-$SPARK_SCALA_VERSION/test-classes"
-fi
-
-# Add hadoop conf dir if given -- otherwise FileSystem.*, etc fail !
-# Note, this assumes that there is either a HADOOP_CONF_DIR or YARN_CONF_DIR which hosts
-# the configurtion files.
-if [ -n "$HADOOP_CONF_DIR" ]; then
- CLASSPATH="$CLASSPATH:$HADOOP_CONF_DIR"
-fi
-if [ -n "$YARN_CONF_DIR" ]; then
- CLASSPATH="$CLASSPATH:$YARN_CONF_DIR"
-fi
-
-# To allow for distributions to append needed libraries to the classpath (e.g. when
-# using the "hadoop-provided" profile to build Spark), check SPARK_DIST_CLASSPATH and
-# append it to tbe final classpath.
-if [ -n "$SPARK_DIST_CLASSPATH" ]; then
- CLASSPATH="$CLASSPATH:$SPARK_DIST_CLASSPATH"
-fi
-
-echo "$CLASSPATH"
diff --git a/bin/load-spark-env.sh b/bin/load-spark-env.sh
index 356b3d49b2..2d7070c25d 100644
--- a/bin/load-spark-env.sh
+++ b/bin/load-spark-env.sh
@@ -41,9 +41,9 @@ fi
if [ -z "$SPARK_SCALA_VERSION" ]; then
- ASSEMBLY_DIR2="$FWDIR/assembly/target/scala-2.11"
- ASSEMBLY_DIR1="$FWDIR/assembly/target/scala-2.10"
-
+ ASSEMBLY_DIR2="$SPARK_HOME/assembly/target/scala-2.11"
+ ASSEMBLY_DIR1="$SPARK_HOME/assembly/target/scala-2.10"
+
if [[ -d "$ASSEMBLY_DIR2" && -d "$ASSEMBLY_DIR1" ]]; then
echo -e "Presence of build for both scala versions(SCALA 2.10 and SCALA 2.11) detected." 1>&2
echo -e 'Either clean one of them or, export SPARK_SCALA_VERSION=2.11 in spark-env.sh.' 1>&2
@@ -54,5 +54,5 @@ if [ -z "$SPARK_SCALA_VERSION" ]; then
export SPARK_SCALA_VERSION="2.11"
else
export SPARK_SCALA_VERSION="2.10"
- fi
+ fi
fi
diff --git a/bin/pyspark b/bin/pyspark
index 0b4f695dd0..e7f6a1a072 100755
--- a/bin/pyspark
+++ b/bin/pyspark
@@ -18,36 +18,24 @@
#
# Figure out where Spark is installed
-FWDIR="$(cd "`dirname "$0"`"/..; pwd)"
+export SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"
-# Export this as SPARK_HOME
-export SPARK_HOME="$FWDIR"
-
-source "$FWDIR/bin/utils.sh"
-
-source "$FWDIR"/bin/load-spark-env.sh
+source "$SPARK_HOME"/bin/load-spark-env.sh
function usage() {
+ if [ -n "$1" ]; then
+ echo $1
+ fi
echo "Usage: ./bin/pyspark [options]" 1>&2
- "$FWDIR"/bin/spark-submit --help 2>&1 | grep -v Usage 1>&2
- exit 0
+ "$SPARK_HOME"/bin/spark-submit --help 2>&1 | grep -v Usage 1>&2
+ exit $2
}
+export -f usage
if [[ "$@" = *--help ]] || [[ "$@" = *-h ]]; then
usage
fi
-# Exit if the user hasn't compiled Spark
-if [ ! -f "$FWDIR/RELEASE" ]; then
- # Exit if the user hasn't compiled Spark
- ls "$FWDIR"/assembly/target/scala-$SPARK_SCALA_VERSION/spark-assembly*hadoop*.jar >& /dev/null
- if [[ $? != 0 ]]; then
- echo "Failed to find Spark assembly in $FWDIR/assembly/target" 1>&2
- echo "You need to build Spark before running this program" 1>&2
- exit 1
- fi
-fi
-
# In Spark <= 1.1, setting IPYTHON=1 would cause the driver to be launched using the `ipython`
# executable, while the worker would still be launched using PYSPARK_PYTHON.
#
@@ -95,26 +83,13 @@ export PYTHONPATH="$SPARK_HOME/python/lib/py4j-0.8.2.1-src.zip:$PYTHONPATH"
# Load the PySpark shell.py script when ./pyspark is used interactively:
export OLD_PYTHONSTARTUP="$PYTHONSTARTUP"
-export PYTHONSTARTUP="$FWDIR/python/pyspark/shell.py"
-
-# Build up arguments list manually to preserve quotes and backslashes.
-# We export Spark submit arguments as an environment variable because shell.py must run as a
-# PYTHONSTARTUP script, which does not take in arguments. This is required for IPython notebooks.
-SUBMIT_USAGE_FUNCTION=usage
-gatherSparkSubmitOpts "$@"
-PYSPARK_SUBMIT_ARGS=""
-whitespace="[[:space:]]"
-for i in "${SUBMISSION_OPTS[@]}"; do
- if [[ $i =~ \" ]]; then i=$(echo $i | sed 's/\"/\\\"/g'); fi
- if [[ $i =~ $whitespace ]]; then i=\"$i\"; fi
- PYSPARK_SUBMIT_ARGS="$PYSPARK_SUBMIT_ARGS $i"
-done
-export PYSPARK_SUBMIT_ARGS
+export PYTHONSTARTUP="$SPARK_HOME/python/pyspark/shell.py"
# For pyspark tests
if [[ -n "$SPARK_TESTING" ]]; then
unset YARN_CONF_DIR
unset HADOOP_CONF_DIR
+ export PYSPARK_SUBMIT_ARGS=pyspark-shell
if [[ -n "$PYSPARK_DOC_TEST" ]]; then
exec "$PYSPARK_DRIVER_PYTHON" -m doctest $1
else
@@ -123,14 +98,6 @@ if [[ -n "$SPARK_TESTING" ]]; then
exit
fi
-# If a python file is provided, directly run spark-submit.
-if [[ "$1" =~ \.py$ ]]; then
- echo -e "\nWARNING: Running python applications through ./bin/pyspark is deprecated as of Spark 1.0." 1>&2
- echo -e "Use ./bin/spark-submit <python file>\n" 1>&2
- primary="$1"
- shift
- gatherSparkSubmitOpts "$@"
- exec "$FWDIR"/bin/spark-submit "${SUBMISSION_OPTS[@]}" "$primary" "${APPLICATION_OPTS[@]}"
-else
- exec "$PYSPARK_DRIVER_PYTHON" $PYSPARK_DRIVER_PYTHON_OPTS
-fi
+export PYSPARK_DRIVER_PYTHON
+export PYSPARK_DRIVER_PYTHON_OPTS
+exec "$SPARK_HOME"/bin/spark-submit pyspark-shell-main "$@"
diff --git a/bin/pyspark2.cmd b/bin/pyspark2.cmd
index a542ec80b4..4f5eb5e206 100644
--- a/bin/pyspark2.cmd
+++ b/bin/pyspark2.cmd
@@ -17,59 +17,22 @@ rem See the License for the specific language governing permissions and
rem limitations under the License.
rem
-set SCALA_VERSION=2.10
-
rem Figure out where the Spark framework is installed
-set FWDIR=%~dp0..\
-
-rem Export this as SPARK_HOME
-set SPARK_HOME=%FWDIR%
-
-rem Test whether the user has built Spark
-if exist "%FWDIR%RELEASE" goto skip_build_test
-set FOUND_JAR=0
-for %%d in ("%FWDIR%assembly\target\scala-%SCALA_VERSION%\spark-assembly*hadoop*.jar") do (
- set FOUND_JAR=1
-)
-if [%FOUND_JAR%] == [0] (
- echo Failed to find Spark assembly JAR.
- echo You need to build Spark before running this program.
- goto exit
-)
-:skip_build_test
+set SPARK_HOME=%~dp0..
rem Load environment variables from conf\spark-env.cmd, if it exists
-if exist "%FWDIR%conf\spark-env.cmd" call "%FWDIR%conf\spark-env.cmd"
+if exist "%SPARK_HOME%\conf\spark-env.cmd" call "%SPARK_HOME%\conf\spark-env.cmd"
rem Figure out which Python to use.
-if [%PYSPARK_PYTHON%] == [] set PYSPARK_PYTHON=python
+if "x%PYSPARK_DRIVER_PYTHON%"=="x" (
+ set PYSPARK_DRIVER_PYTHON=python
+ if not [%PYSPARK_PYTHON%] == [] set PYSPARK_DRIVER_PYTHON=%PYSPARK_PYTHON%
+)
-set PYTHONPATH=%FWDIR%python;%PYTHONPATH%
-set PYTHONPATH=%FWDIR%python\lib\py4j-0.8.2.1-src.zip;%PYTHONPATH%
+set PYTHONPATH=%SPARK_HOME%\python;%PYTHONPATH%
+set PYTHONPATH=%SPARK_HOME%\python\lib\py4j-0.8.2.1-src.zip;%PYTHONPATH%
set OLD_PYTHONSTARTUP=%PYTHONSTARTUP%
-set PYTHONSTARTUP=%FWDIR%python\pyspark\shell.py
-set PYSPARK_SUBMIT_ARGS=%*
-
-echo Running %PYSPARK_PYTHON% with PYTHONPATH=%PYTHONPATH%
-
-rem Check whether the argument is a file
-for /f %%i in ('echo %1^| findstr /R "\.py"') do (
- set PYTHON_FILE=%%i
-)
-
-if [%PYTHON_FILE%] == [] (
- if [%IPYTHON%] == [1] (
- ipython %IPYTHON_OPTS%
- ) else (
- %PYSPARK_PYTHON%
- )
-) else (
- echo.
- echo WARNING: Running python applications through ./bin/pyspark.cmd is deprecated as of Spark 1.0.
- echo Use ./bin/spark-submit ^<python file^>
- echo.
- "%FWDIR%\bin\spark-submit.cmd" %PYSPARK_SUBMIT_ARGS%
-)
+set PYTHONSTARTUP=%SPARK_HOME%\python\pyspark\shell.py
-:exit
+call %SPARK_HOME%\bin\spark-submit2.cmd pyspark-shell-main %*
diff --git a/bin/run-example b/bin/run-example
index a106411392..798e2caeb8 100755
--- a/bin/run-example
+++ b/bin/run-example
@@ -67,7 +67,7 @@ if [[ ! $EXAMPLE_CLASS == org.apache.spark.examples* ]]; then
EXAMPLE_CLASS="org.apache.spark.examples.$EXAMPLE_CLASS"
fi
-"$FWDIR"/bin/spark-submit \
+exec "$FWDIR"/bin/spark-submit \
--master $EXAMPLE_MASTER \
--class $EXAMPLE_CLASS \
"$SPARK_EXAMPLES_JAR" \
diff --git a/bin/spark-class b/bin/spark-class
index 2f0441bb3c..e29b234afa 100755
--- a/bin/spark-class
+++ b/bin/spark-class
@@ -16,89 +16,18 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
-
-# NOTE: Any changes to this file must be reflected in SparkSubmitDriverBootstrapper.scala!
-
-cygwin=false
-case "`uname`" in
- CYGWIN*) cygwin=true;;
-esac
+set -e
# Figure out where Spark is installed
-FWDIR="$(cd "`dirname "$0"`"/..; pwd)"
+export SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"
-# Export this as SPARK_HOME
-export SPARK_HOME="$FWDIR"
-export SPARK_CONF_DIR="${SPARK_CONF_DIR:-"$SPARK_HOME/conf"}"
-
-. "$FWDIR"/bin/load-spark-env.sh
+. "$SPARK_HOME"/bin/load-spark-env.sh
if [ -z "$1" ]; then
echo "Usage: spark-class <class> [<args>]" 1>&2
exit 1
fi
-if [ -n "$SPARK_MEM" ]; then
- echo -e "Warning: SPARK_MEM is deprecated, please use a more specific config option" 1>&2
- echo -e "(e.g., spark.executor.memory or spark.driver.memory)." 1>&2
-fi
-
-# Use SPARK_MEM or 512m as the default memory, to be overridden by specific options
-DEFAULT_MEM=${SPARK_MEM:-512m}
-
-SPARK_DAEMON_JAVA_OPTS="$SPARK_DAEMON_JAVA_OPTS -Dspark.akka.logLifecycleEvents=true"
-
-# Add java opts and memory settings for master, worker, history server, executors, and repl.
-case "$1" in
- # Master, Worker, and HistoryServer use SPARK_DAEMON_JAVA_OPTS (and specific opts) + SPARK_DAEMON_MEMORY.
- 'org.apache.spark.deploy.master.Master')
- OUR_JAVA_OPTS="$SPARK_DAEMON_JAVA_OPTS $SPARK_MASTER_OPTS"
- OUR_JAVA_MEM=${SPARK_DAEMON_MEMORY:-$DEFAULT_MEM}
- ;;
- 'org.apache.spark.deploy.worker.Worker')
- OUR_JAVA_OPTS="$SPARK_DAEMON_JAVA_OPTS $SPARK_WORKER_OPTS"
- OUR_JAVA_MEM=${SPARK_DAEMON_MEMORY:-$DEFAULT_MEM}
- ;;
- 'org.apache.spark.deploy.history.HistoryServer')
- OUR_JAVA_OPTS="$SPARK_DAEMON_JAVA_OPTS $SPARK_HISTORY_OPTS"
- OUR_JAVA_MEM=${SPARK_DAEMON_MEMORY:-$DEFAULT_MEM}
- ;;
-
- # Executors use SPARK_JAVA_OPTS + SPARK_EXECUTOR_MEMORY.
- 'org.apache.spark.executor.CoarseGrainedExecutorBackend')
- OUR_JAVA_OPTS="$SPARK_JAVA_OPTS $SPARK_EXECUTOR_OPTS"
- OUR_JAVA_MEM=${SPARK_EXECUTOR_MEMORY:-$DEFAULT_MEM}
- ;;
- 'org.apache.spark.executor.MesosExecutorBackend')
- OUR_JAVA_OPTS="$SPARK_JAVA_OPTS $SPARK_EXECUTOR_OPTS"
- OUR_JAVA_MEM=${SPARK_EXECUTOR_MEMORY:-$DEFAULT_MEM}
- export PYTHONPATH="$FWDIR/python:$PYTHONPATH"
- export PYTHONPATH="$FWDIR/python/lib/py4j-0.8.2.1-src.zip:$PYTHONPATH"
- ;;
-
- # Spark submit uses SPARK_JAVA_OPTS + SPARK_SUBMIT_OPTS +
- # SPARK_DRIVER_MEMORY + SPARK_SUBMIT_DRIVER_MEMORY.
- 'org.apache.spark.deploy.SparkSubmit')
- OUR_JAVA_OPTS="$SPARK_JAVA_OPTS $SPARK_SUBMIT_OPTS"
- OUR_JAVA_MEM=${SPARK_DRIVER_MEMORY:-$DEFAULT_MEM}
- if [ -n "$SPARK_SUBMIT_LIBRARY_PATH" ]; then
- if [[ $OSTYPE == darwin* ]]; then
- export DYLD_LIBRARY_PATH="$SPARK_SUBMIT_LIBRARY_PATH:$DYLD_LIBRARY_PATH"
- else
- export LD_LIBRARY_PATH="$SPARK_SUBMIT_LIBRARY_PATH:$LD_LIBRARY_PATH"
- fi
- fi
- if [ -n "$SPARK_SUBMIT_DRIVER_MEMORY" ]; then
- OUR_JAVA_MEM="$SPARK_SUBMIT_DRIVER_MEMORY"
- fi
- ;;
-
- *)
- OUR_JAVA_OPTS="$SPARK_JAVA_OPTS"
- OUR_JAVA_MEM=${SPARK_DRIVER_MEMORY:-$DEFAULT_MEM}
- ;;
-esac
-
# Find the java binary
if [ -n "${JAVA_HOME}" ]; then
RUNNER="${JAVA_HOME}/bin/java"
@@ -110,83 +39,48 @@ else
exit 1
fi
fi
-JAVA_VERSION=$("$RUNNER" -version 2>&1 | grep 'version' | sed 's/.* version "\(.*\)\.\(.*\)\..*"/\1\2/; 1q')
-
-# Set JAVA_OPTS to be able to load native libraries and to set heap size
-if [ "$JAVA_VERSION" -ge 18 ]; then
- JAVA_OPTS="$OUR_JAVA_OPTS"
-else
- JAVA_OPTS="-XX:MaxPermSize=128m $OUR_JAVA_OPTS"
-fi
-JAVA_OPTS="$JAVA_OPTS -Xms$OUR_JAVA_MEM -Xmx$OUR_JAVA_MEM"
-
-# Load extra JAVA_OPTS from conf/java-opts, if it exists
-if [ -e "$SPARK_CONF_DIR/java-opts" ] ; then
- JAVA_OPTS="$JAVA_OPTS `cat "$SPARK_CONF_DIR"/java-opts`"
-fi
-
-# Attention: when changing the way the JAVA_OPTS are assembled, the change must be reflected in CommandUtils.scala!
-
-TOOLS_DIR="$FWDIR"/tools
-SPARK_TOOLS_JAR=""
-if [ -e "$TOOLS_DIR"/target/scala-$SPARK_SCALA_VERSION/spark-tools*[0-9Tg].jar ]; then
- # Use the JAR from the SBT build
- export SPARK_TOOLS_JAR="`ls "$TOOLS_DIR"/target/scala-$SPARK_SCALA_VERSION/spark-tools*[0-9Tg].jar`"
-fi
-if [ -e "$TOOLS_DIR"/target/spark-tools*[0-9Tg].jar ]; then
- # Use the JAR from the Maven build
- # TODO: this also needs to become an assembly!
- export SPARK_TOOLS_JAR="`ls "$TOOLS_DIR"/target/spark-tools*[0-9Tg].jar`"
-fi
-# Compute classpath using external script
-classpath_output=$("$FWDIR"/bin/compute-classpath.sh)
-if [[ "$?" != "0" ]]; then
- echo "$classpath_output"
- exit 1
-else
- CLASSPATH="$classpath_output"
-fi
+# Look for the launcher. In non-release mode, add the compiled classes directly to the classpath
+# instead of looking for a jar file.
+SPARK_LAUNCHER_CP=
+if [ -f $SPARK_HOME/RELEASE ]; then
+ LAUNCHER_DIR="$SPARK_HOME/lib"
+ num_jars="$(ls -1 "$LAUNCHER_DIR" | grep "^spark-launcher.*\.jar$" | wc -l)"
+ if [ "$num_jars" -eq "0" -a -z "$SPARK_LAUNCHER_CP" ]; then
+ echo "Failed to find Spark launcher in $LAUNCHER_DIR." 1>&2
+ echo "You need to build Spark before running this program." 1>&2
+ exit 1
+ fi
-if [[ "$1" =~ org.apache.spark.tools.* ]]; then
- if test -z "$SPARK_TOOLS_JAR"; then
- echo "Failed to find Spark Tools Jar in $FWDIR/tools/target/scala-$SPARK_SCALA_VERSION/" 1>&2
- echo "You need to run \"build/sbt tools/package\" before running $1." 1>&2
+ LAUNCHER_JARS="$(ls -1 "$LAUNCHER_DIR" | grep "^spark-launcher.*\.jar$" || true)"
+ if [ "$num_jars" -gt "1" ]; then
+ echo "Found multiple Spark launcher jars in $LAUNCHER_DIR:" 1>&2
+ echo "$LAUNCHER_JARS" 1>&2
+ echo "Please remove all but one jar." 1>&2
exit 1
fi
- CLASSPATH="$CLASSPATH:$SPARK_TOOLS_JAR"
-fi
-if $cygwin; then
- CLASSPATH="`cygpath -wp "$CLASSPATH"`"
- if [ "$1" == "org.apache.spark.tools.JavaAPICompletenessChecker" ]; then
- export SPARK_TOOLS_JAR="`cygpath -w "$SPARK_TOOLS_JAR"`"
+ SPARK_LAUNCHER_CP="${LAUNCHER_DIR}/${LAUNCHER_JARS}"
+else
+ LAUNCHER_DIR="$SPARK_HOME/launcher/target/scala-$SPARK_SCALA_VERSION"
+ if [ ! -d "$LAUNCHER_DIR/classes" ]; then
+ echo "Failed to find Spark launcher classes in $LAUNCHER_DIR." 1>&2
+ echo "You need to build Spark before running this program." 1>&2
+ exit 1
fi
+ SPARK_LAUNCHER_CP="$LAUNCHER_DIR/classes"
fi
-export CLASSPATH
-# In Spark submit client mode, the driver is launched in the same JVM as Spark submit itself.
-# Here we must parse the properties file for relevant "spark.driver.*" configs before launching
-# the driver JVM itself. Instead of handling this complexity in Bash, we launch a separate JVM
-# to prepare the launch environment of this driver JVM.
+# The launcher library will print arguments separated by a NULL character, to allow arguments with
+# characters that would be otherwise interpreted by the shell. Read that in a while loop, populating
+# an array that will be used to exec the final command.
+CMD=()
+while IFS= read -d '' -r ARG; do
+ CMD+=("$ARG")
+done < <("$RUNNER" -cp "$SPARK_LAUNCHER_CP" org.apache.spark.launcher.Main "$@")
-if [ -n "$SPARK_SUBMIT_BOOTSTRAP_DRIVER" ]; then
- # This is used only if the properties file actually contains these special configs
- # Export the environment variables needed by SparkSubmitDriverBootstrapper
- export RUNNER
- export CLASSPATH
- export JAVA_OPTS
- export OUR_JAVA_MEM
- export SPARK_CLASS=1
- shift # Ignore main class (org.apache.spark.deploy.SparkSubmit) and use our own
- exec "$RUNNER" org.apache.spark.deploy.SparkSubmitDriverBootstrapper "$@"
+if [ "${CMD[0]}" = "usage" ]; then
+ "${CMD[@]}"
else
- # Note: The format of this command is closely echoed in SparkSubmitDriverBootstrapper.scala
- if [ -n "$SPARK_PRINT_LAUNCH_COMMAND" ]; then
- echo -n "Spark Command: " 1>&2
- echo "$RUNNER" -cp "$CLASSPATH" $JAVA_OPTS "$@" 1>&2
- echo -e "========================================\n" 1>&2
- fi
- exec "$RUNNER" -cp "$CLASSPATH" $JAVA_OPTS "$@"
+ exec "${CMD[@]}"
fi
-
diff --git a/bin/spark-class2.cmd b/bin/spark-class2.cmd
index da46543647..37d22215a0 100644
--- a/bin/spark-class2.cmd
+++ b/bin/spark-class2.cmd
@@ -17,135 +17,54 @@ rem See the License for the specific language governing permissions and
rem limitations under the License.
rem
-rem Any changes to this file must be reflected in SparkSubmitDriverBootstrapper.scala!
-
-setlocal enabledelayedexpansion
-
-set SCALA_VERSION=2.10
-
rem Figure out where the Spark framework is installed
-set FWDIR=%~dp0..\
-
-rem Export this as SPARK_HOME
-set SPARK_HOME=%FWDIR%
+set SPARK_HOME=%~dp0..
rem Load environment variables from conf\spark-env.cmd, if it exists
-if exist "%FWDIR%conf\spark-env.cmd" call "%FWDIR%conf\spark-env.cmd"
+if exist "%SPARK_HOME%\conf\spark-env.cmd" call "%SPARK_HOME%\conf\spark-env.cmd"
rem Test that an argument was given
-if not "x%1"=="x" goto arg_given
+if "x%1"=="x" (
echo Usage: spark-class ^<class^> [^<args^>]
- goto exit
-:arg_given
-
-if not "x%SPARK_MEM%"=="x" (
- echo Warning: SPARK_MEM is deprecated, please use a more specific config option
- echo e.g., spark.executor.memory or spark.driver.memory.
+ exit /b 1
)
-rem Use SPARK_MEM or 512m as the default memory, to be overridden by specific options
-set OUR_JAVA_MEM=%SPARK_MEM%
-if "x%OUR_JAVA_MEM%"=="x" set OUR_JAVA_MEM=512m
-
-set SPARK_DAEMON_JAVA_OPTS=%SPARK_DAEMON_JAVA_OPTS% -Dspark.akka.logLifecycleEvents=true
-
-rem Add java opts and memory settings for master, worker, history server, executors, and repl.
-rem Master, Worker and HistoryServer use SPARK_DAEMON_JAVA_OPTS (and specific opts) + SPARK_DAEMON_MEMORY.
-if "%1"=="org.apache.spark.deploy.master.Master" (
- set OUR_JAVA_OPTS=%SPARK_DAEMON_JAVA_OPTS% %SPARK_MASTER_OPTS%
- if not "x%SPARK_DAEMON_MEMORY%"=="x" set OUR_JAVA_MEM=%SPARK_DAEMON_MEMORY%
-) else if "%1"=="org.apache.spark.deploy.worker.Worker" (
- set OUR_JAVA_OPTS=%SPARK_DAEMON_JAVA_OPTS% %SPARK_WORKER_OPTS%
- if not "x%SPARK_DAEMON_MEMORY%"=="x" set OUR_JAVA_MEM=%SPARK_DAEMON_MEMORY%
-) else if "%1"=="org.apache.spark.deploy.history.HistoryServer" (
- set OUR_JAVA_OPTS=%SPARK_DAEMON_JAVA_OPTS% %SPARK_HISTORY_OPTS%
- if not "x%SPARK_DAEMON_MEMORY%"=="x" set OUR_JAVA_MEM=%SPARK_DAEMON_MEMORY%
-
-rem Executors use SPARK_JAVA_OPTS + SPARK_EXECUTOR_MEMORY.
-) else if "%1"=="org.apache.spark.executor.CoarseGrainedExecutorBackend" (
- set OUR_JAVA_OPTS=%SPARK_JAVA_OPTS% %SPARK_EXECUTOR_OPTS%
- if not "x%SPARK_EXECUTOR_MEMORY%"=="x" set OUR_JAVA_MEM=%SPARK_EXECUTOR_MEMORY%
-) else if "%1"=="org.apache.spark.executor.MesosExecutorBackend" (
- set OUR_JAVA_OPTS=%SPARK_JAVA_OPTS% %SPARK_EXECUTOR_OPTS%
- if not "x%SPARK_EXECUTOR_MEMORY%"=="x" set OUR_JAVA_MEM=%SPARK_EXECUTOR_MEMORY%
+set LAUNCHER_CP=0
+if exist %SPARK_HOME%\RELEASE goto find_release_launcher
-rem Spark submit uses SPARK_JAVA_OPTS + SPARK_SUBMIT_OPTS +
-rem SPARK_DRIVER_MEMORY + SPARK_SUBMIT_DRIVER_MEMORY.
-rem The repl also uses SPARK_REPL_OPTS.
-) else if "%1"=="org.apache.spark.deploy.SparkSubmit" (
- set OUR_JAVA_OPTS=%SPARK_JAVA_OPTS% %SPARK_SUBMIT_OPTS% %SPARK_REPL_OPTS%
- if not "x%SPARK_SUBMIT_LIBRARY_PATH%"=="x" (
- set OUR_JAVA_OPTS=!OUR_JAVA_OPTS! -Djava.library.path=%SPARK_SUBMIT_LIBRARY_PATH%
- ) else if not "x%SPARK_LIBRARY_PATH%"=="x" (
- set OUR_JAVA_OPTS=!OUR_JAVA_OPTS! -Djava.library.path=%SPARK_LIBRARY_PATH%
- )
- if not "x%SPARK_DRIVER_MEMORY%"=="x" set OUR_JAVA_MEM=%SPARK_DRIVER_MEMORY%
- if not "x%SPARK_SUBMIT_DRIVER_MEMORY%"=="x" set OUR_JAVA_MEM=%SPARK_SUBMIT_DRIVER_MEMORY%
-) else (
- set OUR_JAVA_OPTS=%SPARK_JAVA_OPTS%
- if not "x%SPARK_DRIVER_MEMORY%"=="x" set OUR_JAVA_MEM=%SPARK_DRIVER_MEMORY%
+rem Look for the Spark launcher in both Scala build directories. The launcher doesn't use Scala so
+rem it doesn't really matter which one is picked up. Add the compiled classes directly to the
+rem classpath instead of looking for a jar file, since it's very common for people using sbt to use
+rem the "assembly" target instead of "package".
+set LAUNCHER_CLASSES=%SPARK_HOME%\launcher\target\scala-2.10\classes
+if exist %LAUNCHER_CLASSES% (
+ set LAUNCHER_CP=%LAUNCHER_CLASSES%
)
-
-rem Set JAVA_OPTS to be able to load native libraries and to set heap size
-for /f "tokens=3" %%i in ('java -version 2^>^&1 ^| find "version"') do set jversion=%%i
-for /f "tokens=1 delims=_" %%i in ("%jversion:~1,-1%") do set jversion=%%i
-if "%jversion%" geq "1.8.0" (
- set JAVA_OPTS=%OUR_JAVA_OPTS% -Xms%OUR_JAVA_MEM% -Xmx%OUR_JAVA_MEM%
-) else (
- set JAVA_OPTS=-XX:MaxPermSize=128m %OUR_JAVA_OPTS% -Xms%OUR_JAVA_MEM% -Xmx%OUR_JAVA_MEM%
+set LAUNCHER_CLASSES=%SPARK_HOME%\launcher\target\scala-2.11\classes
+if exist %LAUNCHER_CLASSES% (
+ set LAUNCHER_CP=%LAUNCHER_CLASSES%
)
-rem Attention: when changing the way the JAVA_OPTS are assembled, the change must be reflected in CommandUtils.scala!
+goto check_launcher
-rem Test whether the user has built Spark
-if exist "%FWDIR%RELEASE" goto skip_build_test
-set FOUND_JAR=0
-for %%d in ("%FWDIR%assembly\target\scala-%SCALA_VERSION%\spark-assembly*hadoop*.jar") do (
- set FOUND_JAR=1
-)
-if "%FOUND_JAR%"=="0" (
- echo Failed to find Spark assembly JAR.
- echo You need to build Spark before running this program.
- goto exit
+:find_release_launcher
+for %%d in (%SPARK_HOME%\lib\spark-launcher*.jar) do (
+ set LAUNCHER_CP=%%d
)
-:skip_build_test
-set TOOLS_DIR=%FWDIR%tools
-set SPARK_TOOLS_JAR=
-for %%d in ("%TOOLS_DIR%\target\scala-%SCALA_VERSION%\spark-tools*assembly*.jar") do (
- set SPARK_TOOLS_JAR=%%d
+:check_launcher
+if "%LAUNCHER_CP%"=="0" (
+ echo Failed to find Spark launcher JAR.
+ echo You need to build Spark before running this program.
+ exit /b 1
)
-rem Compute classpath using external script
-set DONT_PRINT_CLASSPATH=1
-call "%FWDIR%bin\compute-classpath.cmd"
-set DONT_PRINT_CLASSPATH=0
-set CLASSPATH=%CLASSPATH%;%SPARK_TOOLS_JAR%
-
rem Figure out where java is.
set RUNNER=java
if not "x%JAVA_HOME%"=="x" set RUNNER=%JAVA_HOME%\bin\java
-rem In Spark submit client mode, the driver is launched in the same JVM as Spark submit itself.
-rem Here we must parse the properties file for relevant "spark.driver.*" configs before launching
-rem the driver JVM itself. Instead of handling this complexity here, we launch a separate JVM
-rem to prepare the launch environment of this driver JVM.
-
-rem In this case, leave out the main class (org.apache.spark.deploy.SparkSubmit) and use our own.
-rem Leaving out the first argument is surprisingly difficult to do in Windows. Note that this must
-rem be done here because the Windows "shift" command does not work in a conditional block.
-set BOOTSTRAP_ARGS=
-shift
-:start_parse
-if "%~1" == "" goto end_parse
-set BOOTSTRAP_ARGS=%BOOTSTRAP_ARGS% %~1
-shift
-goto start_parse
-:end_parse
-
-if not [%SPARK_SUBMIT_BOOTSTRAP_DRIVER%] == [] (
- set SPARK_CLASS=1
- "%RUNNER%" org.apache.spark.deploy.SparkSubmitDriverBootstrapper %BOOTSTRAP_ARGS%
-) else (
- "%RUNNER%" -cp "%CLASSPATH%" %JAVA_OPTS% %*
+rem The launcher library prints the command to be executed in a single line suitable for being
+rem executed by the batch interpreter. So read all the output of the launcher into a variable.
+for /f "tokens=*" %%i in ('cmd /C ""%RUNNER%" -cp %LAUNCHER_CP% org.apache.spark.launcher.Main %*"') do (
+ set SPARK_CMD=%%i
)
-:exit
+%SPARK_CMD%
diff --git a/bin/spark-shell b/bin/spark-shell
index cca5aa0676..b3761b5e13 100755
--- a/bin/spark-shell
+++ b/bin/spark-shell
@@ -28,25 +28,24 @@ esac
# Enter posix mode for bash
set -o posix
-## Global script variables
-FWDIR="$(cd "`dirname "$0"`"/..; pwd)"
+export FWDIR="$(cd "`dirname "$0"`"/..; pwd)"
-function usage() {
+usage() {
+ if [ -n "$1" ]; then
+ echo "$1"
+ fi
echo "Usage: ./bin/spark-shell [options]"
"$FWDIR"/bin/spark-submit --help 2>&1 | grep -v Usage 1>&2
- exit 0
+ exit "$2"
}
+export -f usage
if [[ "$@" = *--help ]] || [[ "$@" = *-h ]]; then
- usage
+ usage "" 0
fi
-source "$FWDIR"/bin/utils.sh
-SUBMIT_USAGE_FUNCTION=usage
-gatherSparkSubmitOpts "$@"
-
# SPARK-4161: scala does not assume use of the java classpath,
-# so we need to add the "-Dscala.usejavacp=true" flag mnually. We
+# so we need to add the "-Dscala.usejavacp=true" flag manually. We
# do this specifically for the Spark shell because the scala REPL
# has its own class loader, and any additional classpath specified
# through spark.driver.extraClassPath is not automatically propagated.
@@ -61,11 +60,11 @@ function main() {
# (see https://github.com/sbt/sbt/issues/562).
stty -icanon min 1 -echo > /dev/null 2>&1
export SPARK_SUBMIT_OPTS="$SPARK_SUBMIT_OPTS -Djline.terminal=unix"
- "$FWDIR"/bin/spark-submit --class org.apache.spark.repl.Main "${SUBMISSION_OPTS[@]}" spark-shell "${APPLICATION_OPTS[@]}"
+ "$FWDIR"/bin/spark-submit --class org.apache.spark.repl.Main "$@"
stty icanon echo > /dev/null 2>&1
else
export SPARK_SUBMIT_OPTS
- "$FWDIR"/bin/spark-submit --class org.apache.spark.repl.Main "${SUBMISSION_OPTS[@]}" spark-shell "${APPLICATION_OPTS[@]}"
+ "$FWDIR"/bin/spark-submit --class org.apache.spark.repl.Main "$@"
fi
}
diff --git a/bin/spark-shell2.cmd b/bin/spark-shell2.cmd
index 1d1a40da31..02f51fe59a 100644
--- a/bin/spark-shell2.cmd
+++ b/bin/spark-shell2.cmd
@@ -25,17 +25,28 @@ if %ERRORLEVEL% equ 0 (
exit /b 0
)
-call %SPARK_HOME%\bin\windows-utils.cmd %*
-if %ERRORLEVEL% equ 1 (
+rem SPARK-4161: scala does not assume use of the java classpath,
+rem so we need to add the "-Dscala.usejavacp=true" flag manually. We
+rem do this specifically for the Spark shell because the scala REPL
+rem has its own class loader, and any additional classpath specified
+rem through spark.driver.extraClassPath is not automatically propagated.
+if "x%SPARK_SUBMIT_OPTS%"=="x" (
+ set SPARK_SUBMIT_OPTS=-Dscala.usejavacp=true
+ goto run_shell
+)
+set SPARK_SUBMIT_OPTS="%SPARK_SUBMIT_OPTS% -Dscala.usejavacp=true"
+
+:run_shell
+call %SPARK_HOME%\bin\spark-submit2.cmd --class org.apache.spark.repl.Main %*
+set SPARK_ERROR_LEVEL=%ERRORLEVEL%
+if not "x%SPARK_LAUNCHER_USAGE_ERROR%"=="x" (
call :usage
exit /b 1
)
-
-cmd /V /E /C %SPARK_HOME%\bin\spark-submit.cmd --class org.apache.spark.repl.Main %SUBMISSION_OPTS% spark-shell %APPLICATION_OPTS%
-
-exit /b 0
+exit /b %SPARK_ERROR_LEVEL%
:usage
+echo %SPARK_LAUNCHER_USAGE_ERROR%
echo "Usage: .\bin\spark-shell.cmd [options]" >&2
-%SPARK_HOME%\bin\spark-submit --help 2>&1 | findstr /V "Usage" 1>&2
-exit /b 0
+call %SPARK_HOME%\bin\spark-submit2.cmd --help 2>&1 | findstr /V "Usage" 1>&2
+goto :eof
diff --git a/bin/spark-sql b/bin/spark-sql
index 3b6cc420fe..ca1729f4cf 100755
--- a/bin/spark-sql
+++ b/bin/spark-sql
@@ -25,12 +25,15 @@ set -o posix
# NOTE: This exact class name is matched downstream by SparkSubmit.
# Any changes need to be reflected there.
-CLASS="org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver"
+export CLASS="org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver"
# Figure out where Spark is installed
-FWDIR="$(cd "`dirname "$0"`"/..; pwd)"
+export FWDIR="$(cd "`dirname "$0"`"/..; pwd)"
function usage {
+ if [ -n "$1" ]; then
+ echo "$1"
+ fi
echo "Usage: ./bin/spark-sql [options] [cli option]"
pattern="usage"
pattern+="\|Spark assembly has been built with Hive"
@@ -42,16 +45,13 @@ function usage {
"$FWDIR"/bin/spark-submit --help 2>&1 | grep -v Usage 1>&2
echo
echo "CLI options:"
- "$FWDIR"/bin/spark-class $CLASS --help 2>&1 | grep -v "$pattern" 1>&2
+ "$FWDIR"/bin/spark-class "$CLASS" --help 2>&1 | grep -v "$pattern" 1>&2
+ exit "$2"
}
+export -f usage
if [[ "$@" = *--help ]] || [[ "$@" = *-h ]]; then
- usage
- exit 0
+ usage "" 0
fi
-source "$FWDIR"/bin/utils.sh
-SUBMIT_USAGE_FUNCTION=usage
-gatherSparkSubmitOpts "$@"
-
-exec "$FWDIR"/bin/spark-submit --class $CLASS "${SUBMISSION_OPTS[@]}" spark-internal "${APPLICATION_OPTS[@]}"
+exec "$FWDIR"/bin/spark-submit --class "$CLASS" "$@"
diff --git a/bin/spark-submit b/bin/spark-submit
index 3e5cbdbb24..bcff78edd5 100755
--- a/bin/spark-submit
+++ b/bin/spark-submit
@@ -17,58 +17,18 @@
# limitations under the License.
#
-# NOTE: Any changes in this file must be reflected in SparkSubmitDriverBootstrapper.scala!
-
-export SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"
-ORIG_ARGS=("$@")
-
-# Set COLUMNS for progress bar
-export COLUMNS=`tput cols`
-
-while (($#)); do
- if [ "$1" = "--deploy-mode" ]; then
- SPARK_SUBMIT_DEPLOY_MODE=$2
- elif [ "$1" = "--properties-file" ]; then
- SPARK_SUBMIT_PROPERTIES_FILE=$2
- elif [ "$1" = "--driver-memory" ]; then
- export SPARK_SUBMIT_DRIVER_MEMORY=$2
- elif [ "$1" = "--driver-library-path" ]; then
- export SPARK_SUBMIT_LIBRARY_PATH=$2
- elif [ "$1" = "--driver-class-path" ]; then
- export SPARK_SUBMIT_CLASSPATH=$2
- elif [ "$1" = "--driver-java-options" ]; then
- export SPARK_SUBMIT_OPTS=$2
- elif [ "$1" = "--master" ]; then
- export MASTER=$2
- fi
- shift
-done
-
-if [ -z "$SPARK_CONF_DIR" ]; then
- export SPARK_CONF_DIR="$SPARK_HOME/conf"
-fi
-DEFAULT_PROPERTIES_FILE="$SPARK_CONF_DIR/spark-defaults.conf"
-if [ "$MASTER" == "yarn-cluster" ]; then
- SPARK_SUBMIT_DEPLOY_MODE=cluster
+SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"
+
+# Only define a usage function if an upstream script hasn't done so.
+if ! type -t usage >/dev/null 2>&1; then
+ usage() {
+ if [ -n "$1" ]; then
+ echo "$1"
+ fi
+ "$SPARK_HOME"/bin/spark-class org.apache.spark.deploy.SparkSubmit --help
+ exit "$2"
+ }
+ export -f usage
fi
-export SPARK_SUBMIT_DEPLOY_MODE=${SPARK_SUBMIT_DEPLOY_MODE:-"client"}
-export SPARK_SUBMIT_PROPERTIES_FILE=${SPARK_SUBMIT_PROPERTIES_FILE:-"$DEFAULT_PROPERTIES_FILE"}
-
-# For client mode, the driver will be launched in the same JVM that launches
-# SparkSubmit, so we may need to read the properties file for any extra class
-# paths, library paths, java options and memory early on. Otherwise, it will
-# be too late by the time the driver JVM has started.
-
-if [[ "$SPARK_SUBMIT_DEPLOY_MODE" == "client" && -f "$SPARK_SUBMIT_PROPERTIES_FILE" ]]; then
- # Parse the properties file only if the special configs exist
- contains_special_configs=$(
- grep -e "spark.driver.extra*\|spark.driver.memory" "$SPARK_SUBMIT_PROPERTIES_FILE" | \
- grep -v "^[[:space:]]*#"
- )
- if [ -n "$contains_special_configs" ]; then
- export SPARK_SUBMIT_BOOTSTRAP_DRIVER=1
- fi
-fi
-
-exec "$SPARK_HOME"/bin/spark-class org.apache.spark.deploy.SparkSubmit "${ORIG_ARGS[@]}"
+exec "$SPARK_HOME"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"
diff --git a/bin/spark-submit2.cmd b/bin/spark-submit2.cmd
index 446cbc74b7..08ddb18574 100644
--- a/bin/spark-submit2.cmd
+++ b/bin/spark-submit2.cmd
@@ -17,62 +17,19 @@ rem See the License for the specific language governing permissions and
rem limitations under the License.
rem
-rem NOTE: Any changes in this file must be reflected in SparkSubmitDriverBootstrapper.scala!
-
-set SPARK_HOME=%~dp0..
-set ORIG_ARGS=%*
-
-rem Reset the values of all variables used
-set SPARK_SUBMIT_DEPLOY_MODE=client
-
-if [%SPARK_CONF_DIR%] == [] (
- set SPARK_CONF_DIR=%SPARK_HOME%\conf
-)
-set SPARK_SUBMIT_PROPERTIES_FILE=%SPARK_CONF_DIR%\spark-defaults.conf
-set SPARK_SUBMIT_DRIVER_MEMORY=
-set SPARK_SUBMIT_LIBRARY_PATH=
-set SPARK_SUBMIT_CLASSPATH=
-set SPARK_SUBMIT_OPTS=
-set SPARK_SUBMIT_BOOTSTRAP_DRIVER=
-
-:loop
-if [%1] == [] goto continue
- if [%1] == [--deploy-mode] (
- set SPARK_SUBMIT_DEPLOY_MODE=%2
- ) else if [%1] == [--properties-file] (
- set SPARK_SUBMIT_PROPERTIES_FILE=%2
- ) else if [%1] == [--driver-memory] (
- set SPARK_SUBMIT_DRIVER_MEMORY=%2
- ) else if [%1] == [--driver-library-path] (
- set SPARK_SUBMIT_LIBRARY_PATH=%2
- ) else if [%1] == [--driver-class-path] (
- set SPARK_SUBMIT_CLASSPATH=%2
- ) else if [%1] == [--driver-java-options] (
- set SPARK_SUBMIT_OPTS=%2
- ) else if [%1] == [--master] (
- set MASTER=%2
- )
- shift
-goto loop
-:continue
-
-if [%MASTER%] == [yarn-cluster] (
- set SPARK_SUBMIT_DEPLOY_MODE=cluster
-)
-
-rem For client mode, the driver will be launched in the same JVM that launches
-rem SparkSubmit, so we may need to read the properties file for any extra class
-rem paths, library paths, java options and memory early on. Otherwise, it will
-rem be too late by the time the driver JVM has started.
-
-if [%SPARK_SUBMIT_DEPLOY_MODE%] == [client] (
- if exist %SPARK_SUBMIT_PROPERTIES_FILE% (
- rem Parse the properties file only if the special configs exist
- for /f %%i in ('findstr /r /c:"^[\t ]*spark.driver.memory" /c:"^[\t ]*spark.driver.extra" ^
- %SPARK_SUBMIT_PROPERTIES_FILE%') do (
- set SPARK_SUBMIT_BOOTSTRAP_DRIVER=1
- )
- )
+rem This is the entry point for running Spark submit. To avoid polluting the
+rem environment, it just launches a new cmd to do the real work.
+
+set CLASS=org.apache.spark.deploy.SparkSubmit
+call %~dp0spark-class2.cmd %CLASS% %*
+set SPARK_ERROR_LEVEL=%ERRORLEVEL%
+if not "x%SPARK_LAUNCHER_USAGE_ERROR%"=="x" (
+ call :usage
+ exit /b 1
)
+exit /b %SPARK_ERROR_LEVEL%
-cmd /V /E /C %SPARK_HOME%\bin\spark-class.cmd org.apache.spark.deploy.SparkSubmit %ORIG_ARGS%
+:usage
+echo %SPARK_LAUNCHER_USAGE_ERROR%
+call %SPARK_HOME%\bin\spark-class2.cmd %CLASS% --help
+goto :eof
diff --git a/bin/utils.sh b/bin/utils.sh
deleted file mode 100755
index 748dbe345a..0000000000
--- a/bin/utils.sh
+++ /dev/null
@@ -1,60 +0,0 @@
-#!/usr/bin/env bash
-
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-# Gather all spark-submit options into SUBMISSION_OPTS
-function gatherSparkSubmitOpts() {
-
- if [ -z "$SUBMIT_USAGE_FUNCTION" ]; then
- echo "Function for printing usage of $0 is not set." 1>&2
- echo "Please set usage function to shell variable 'SUBMIT_USAGE_FUNCTION' in $0" 1>&2
- exit 1
- fi
-
- # NOTE: If you add or remove spark-submit options,
- # modify NOT ONLY this script but also SparkSubmitArgument.scala
- SUBMISSION_OPTS=()
- APPLICATION_OPTS=()
- while (($#)); do
- case "$1" in
- --master | --deploy-mode | --class | --name | --jars | --packages | --py-files | --files | \
- --conf | --repositories | --properties-file | --driver-memory | --driver-java-options | \
- --driver-library-path | --driver-class-path | --executor-memory | --driver-cores | \
- --total-executor-cores | --executor-cores | --queue | --num-executors | --archives | \
- --proxy-user)
- if [[ $# -lt 2 ]]; then
- "$SUBMIT_USAGE_FUNCTION"
- exit 1;
- fi
- SUBMISSION_OPTS+=("$1"); shift
- SUBMISSION_OPTS+=("$1"); shift
- ;;
-
- --verbose | -v | --supervise)
- SUBMISSION_OPTS+=("$1"); shift
- ;;
-
- *)
- APPLICATION_OPTS+=("$1"); shift
- ;;
- esac
- done
-
- export SUBMISSION_OPTS
- export APPLICATION_OPTS
-}
diff --git a/bin/windows-utils.cmd b/bin/windows-utils.cmd
deleted file mode 100644
index 0cf9e87ca5..0000000000
--- a/bin/windows-utils.cmd
+++ /dev/null
@@ -1,60 +0,0 @@
-rem
-rem Licensed to the Apache Software Foundation (ASF) under one or more
-rem contributor license agreements. See the NOTICE file distributed with
-rem this work for additional information regarding copyright ownership.
-rem The ASF licenses this file to You under the Apache License, Version 2.0
-rem (the "License"); you may not use this file except in compliance with
-rem the License. You may obtain a copy of the License at
-rem
-rem http://www.apache.org/licenses/LICENSE-2.0
-rem
-rem Unless required by applicable law or agreed to in writing, software
-rem distributed under the License is distributed on an "AS IS" BASIS,
-rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-rem See the License for the specific language governing permissions and
-rem limitations under the License.
-rem
-
-rem Gather all spark-submit options into SUBMISSION_OPTS
-
-set SUBMISSION_OPTS=
-set APPLICATION_OPTS=
-
-rem NOTE: If you add or remove spark-sumbmit options,
-rem modify NOT ONLY this script but also SparkSubmitArgument.scala
-
-:OptsLoop
-if "x%1"=="x" (
- goto :OptsLoopEnd
-)
-
-SET opts="\<--master\> \<--deploy-mode\> \<--class\> \<--name\> \<--jars\> \<--py-files\> \<--files\>"
-SET opts="%opts:~1,-1% \<--conf\> \<--properties-file\> \<--driver-memory\> \<--driver-java-options\>"
-SET opts="%opts:~1,-1% \<--driver-library-path\> \<--driver-class-path\> \<--executor-memory\>"
-SET opts="%opts:~1,-1% \<--driver-cores\> \<--total-executor-cores\> \<--executor-cores\> \<--queue\>"
-SET opts="%opts:~1,-1% \<--num-executors\> \<--archives\> \<--packages\> \<--repositories\>"
-SET opts="%opts:~1,-1% \<--proxy-user\>"
-
-echo %1 | findstr %opts% >nul
-if %ERRORLEVEL% equ 0 (
- if "x%2"=="x" (
- echo "%1" requires an argument. >&2
- exit /b 1
- )
- set SUBMISSION_OPTS=%SUBMISSION_OPTS% %1 %2
- shift
- shift
- goto :OptsLoop
-)
-echo %1 | findstr "\<--verbose\> \<-v\> \<--supervise\>" >nul
-if %ERRORLEVEL% equ 0 (
- set SUBMISSION_OPTS=%SUBMISSION_OPTS% %1
- shift
- goto :OptsLoop
-)
-set APPLICATION_OPTS=%APPLICATION_OPTS% %1
-shift
-goto :OptsLoop
-
-:OptsLoopEnd
-exit /b 0
diff --git a/core/pom.xml b/core/pom.xml
index dc0d07d806..4164a3a720 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -78,6 +78,11 @@
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
+ <artifactId>spark-launcher_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
<artifactId>spark-network-common_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
index 82e66a3742..94e4bdbfb7 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
@@ -18,18 +18,22 @@
package org.apache.spark.deploy
import java.net.URI
+import java.util.{List => JList}
import java.util.jar.JarFile
+import scala.collection.JavaConversions._
import scala.collection.mutable.{ArrayBuffer, HashMap}
import org.apache.spark.deploy.SparkSubmitAction._
+import org.apache.spark.launcher.SparkSubmitArgumentsParser
import org.apache.spark.util.Utils
/**
* Parses and encapsulates arguments from the spark-submit script.
* The env argument is used for testing.
*/
-private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, String] = sys.env) {
+private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, String] = sys.env)
+ extends SparkSubmitArgumentsParser {
var master: String = null
var deployMode: String = null
var executorMemory: String = null
@@ -84,7 +88,12 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St
}
// Set parameters from command line arguments
- parseOpts(args.toList)
+ try {
+ parse(args.toList)
+ } catch {
+ case e: IllegalArgumentException =>
+ SparkSubmit.printErrorAndExit(e.getMessage())
+ }
// Populate `sparkProperties` map from properties file
mergeDefaultSparkProperties()
// Use `sparkProperties` map along with env vars to fill in any missing parameters
@@ -277,167 +286,139 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St
""".stripMargin
}
- /**
- * Fill in values by parsing user options.
- * NOTE: Any changes here must be reflected in YarnClientSchedulerBackend.
- */
- private def parseOpts(opts: Seq[String]): Unit = {
- val EQ_SEPARATED_OPT="""(--[^=]+)=(.+)""".r
-
- // Delineates parsing of Spark options from parsing of user options.
- parse(opts)
-
- /**
- * NOTE: If you add or remove spark-submit options,
- * modify NOT ONLY this file but also utils.sh
- */
- def parse(opts: Seq[String]): Unit = opts match {
- case ("--name") :: value :: tail =>
+ /** Fill in values by parsing user options. */
+ override protected def handle(opt: String, value: String): Boolean = {
+ opt match {
+ case NAME =>
name = value
- parse(tail)
- case ("--master") :: value :: tail =>
+ case MASTER =>
master = value
- parse(tail)
- case ("--class") :: value :: tail =>
+ case CLASS =>
mainClass = value
- parse(tail)
- case ("--deploy-mode") :: value :: tail =>
+ case DEPLOY_MODE =>
if (value != "client" && value != "cluster") {
SparkSubmit.printErrorAndExit("--deploy-mode must be either \"client\" or \"cluster\"")
}
deployMode = value
- parse(tail)
- case ("--num-executors") :: value :: tail =>
+ case NUM_EXECUTORS =>
numExecutors = value
- parse(tail)
- case ("--total-executor-cores") :: value :: tail =>
+ case TOTAL_EXECUTOR_CORES =>
totalExecutorCores = value
- parse(tail)
- case ("--executor-cores") :: value :: tail =>
+ case EXECUTOR_CORES =>
executorCores = value
- parse(tail)
- case ("--executor-memory") :: value :: tail =>
+ case EXECUTOR_MEMORY =>
executorMemory = value
- parse(tail)
- case ("--driver-memory") :: value :: tail =>
+ case DRIVER_MEMORY =>
driverMemory = value
- parse(tail)
- case ("--driver-cores") :: value :: tail =>
+ case DRIVER_CORES =>
driverCores = value
- parse(tail)
- case ("--driver-class-path") :: value :: tail =>
+ case DRIVER_CLASS_PATH =>
driverExtraClassPath = value
- parse(tail)
- case ("--driver-java-options") :: value :: tail =>
+ case DRIVER_JAVA_OPTIONS =>
driverExtraJavaOptions = value
- parse(tail)
- case ("--driver-library-path") :: value :: tail =>
+ case DRIVER_LIBRARY_PATH =>
driverExtraLibraryPath = value
- parse(tail)
- case ("--properties-file") :: value :: tail =>
+ case PROPERTIES_FILE =>
propertiesFile = value
- parse(tail)
- case ("--kill") :: value :: tail =>
+ case KILL_SUBMISSION =>
submissionToKill = value
if (action != null) {
SparkSubmit.printErrorAndExit(s"Action cannot be both $action and $KILL.")
}
action = KILL
- parse(tail)
- case ("--status") :: value :: tail =>
+ case STATUS =>
submissionToRequestStatusFor = value
if (action != null) {
SparkSubmit.printErrorAndExit(s"Action cannot be both $action and $REQUEST_STATUS.")
}
action = REQUEST_STATUS
- parse(tail)
- case ("--supervise") :: tail =>
+ case SUPERVISE =>
supervise = true
- parse(tail)
- case ("--queue") :: value :: tail =>
+ case QUEUE =>
queue = value
- parse(tail)
- case ("--files") :: value :: tail =>
+ case FILES =>
files = Utils.resolveURIs(value)
- parse(tail)
- case ("--py-files") :: value :: tail =>
+ case PY_FILES =>
pyFiles = Utils.resolveURIs(value)
- parse(tail)
- case ("--archives") :: value :: tail =>
+ case ARCHIVES =>
archives = Utils.resolveURIs(value)
- parse(tail)
- case ("--jars") :: value :: tail =>
+ case JARS =>
jars = Utils.resolveURIs(value)
- parse(tail)
- case ("--packages") :: value :: tail =>
+ case PACKAGES =>
packages = value
- parse(tail)
- case ("--repositories") :: value :: tail =>
+ case REPOSITORIES =>
repositories = value
- parse(tail)
- case ("--conf" | "-c") :: value :: tail =>
+ case CONF =>
value.split("=", 2).toSeq match {
case Seq(k, v) => sparkProperties(k) = v
case _ => SparkSubmit.printErrorAndExit(s"Spark config without '=': $value")
}
- parse(tail)
- case ("--proxy-user") :: value :: tail =>
+ case PROXY_USER =>
proxyUser = value
- parse(tail)
- case ("--help" | "-h") :: tail =>
+ case HELP =>
printUsageAndExit(0)
- case ("--verbose" | "-v") :: tail =>
+ case VERBOSE =>
verbose = true
- parse(tail)
- case ("--version") :: tail =>
+ case VERSION =>
SparkSubmit.printVersionAndExit()
- case EQ_SEPARATED_OPT(opt, value) :: tail =>
- parse(opt :: value :: tail)
+ case _ =>
+ throw new IllegalArgumentException(s"Unexpected argument '$opt'.")
+ }
+ true
+ }
- case value :: tail if value.startsWith("-") =>
- SparkSubmit.printErrorAndExit(s"Unrecognized option '$value'.")
+ /**
+ * Handle unrecognized command line options.
+ *
+ * The first unrecognized option is treated as the "primary resource". Everything else is
+ * treated as application arguments.
+ */
+ override protected def handleUnknown(opt: String): Boolean = {
+ if (opt.startsWith("-")) {
+ SparkSubmit.printErrorAndExit(s"Unrecognized option '$opt'.")
+ }
- case value :: tail =>
- primaryResource =
- if (!SparkSubmit.isShell(value) && !SparkSubmit.isInternal(value)) {
- Utils.resolveURI(value).toString
- } else {
- value
- }
- isPython = SparkSubmit.isPython(value)
- childArgs ++= tail
+ primaryResource =
+ if (!SparkSubmit.isShell(opt) && !SparkSubmit.isInternal(opt)) {
+ Utils.resolveURI(opt).toString
+ } else {
+ opt
+ }
+ isPython = SparkSubmit.isPython(opt)
+ false
+ }
- case Nil =>
- }
+ override protected def handleExtraArgs(extra: JList[String]): Unit = {
+ childArgs ++= extra
}
private def printUsageAndExit(exitCode: Int, unknownParam: Any = null): Unit = {
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala
deleted file mode 100644
index 311048cdaa..0000000000
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala
+++ /dev/null
@@ -1,170 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy
-
-import scala.collection.JavaConversions._
-
-import org.apache.spark.util.{RedirectThread, Utils}
-
-/**
- * Launch an application through Spark submit in client mode with the appropriate classpath,
- * library paths, java options and memory. These properties of the JVM must be set before the
- * driver JVM is launched. The sole purpose of this class is to avoid handling the complexity
- * of parsing the properties file for such relevant configs in Bash.
- *
- * Usage: org.apache.spark.deploy.SparkSubmitDriverBootstrapper <submit args>
- */
-private[spark] object SparkSubmitDriverBootstrapper {
-
- // Note: This class depends on the behavior of `bin/spark-class` and `bin/spark-submit`.
- // Any changes made there must be reflected in this file.
-
- def main(args: Array[String]): Unit = {
-
- // This should be called only from `bin/spark-class`
- if (!sys.env.contains("SPARK_CLASS")) {
- System.err.println("SparkSubmitDriverBootstrapper must be called from `bin/spark-class`!")
- System.exit(1)
- }
-
- val submitArgs = args
- val runner = sys.env("RUNNER")
- val classpath = sys.env("CLASSPATH")
- val javaOpts = sys.env("JAVA_OPTS")
- val defaultDriverMemory = sys.env("OUR_JAVA_MEM")
-
- // Spark submit specific environment variables
- val deployMode = sys.env("SPARK_SUBMIT_DEPLOY_MODE")
- val propertiesFile = sys.env("SPARK_SUBMIT_PROPERTIES_FILE")
- val bootstrapDriver = sys.env("SPARK_SUBMIT_BOOTSTRAP_DRIVER")
- val submitDriverMemory = sys.env.get("SPARK_SUBMIT_DRIVER_MEMORY")
- val submitLibraryPath = sys.env.get("SPARK_SUBMIT_LIBRARY_PATH")
- val submitClasspath = sys.env.get("SPARK_SUBMIT_CLASSPATH")
- val submitJavaOpts = sys.env.get("SPARK_SUBMIT_OPTS")
-
- assume(runner != null, "RUNNER must be set")
- assume(classpath != null, "CLASSPATH must be set")
- assume(javaOpts != null, "JAVA_OPTS must be set")
- assume(defaultDriverMemory != null, "OUR_JAVA_MEM must be set")
- assume(deployMode == "client", "SPARK_SUBMIT_DEPLOY_MODE must be \"client\"!")
- assume(propertiesFile != null, "SPARK_SUBMIT_PROPERTIES_FILE must be set")
- assume(bootstrapDriver != null, "SPARK_SUBMIT_BOOTSTRAP_DRIVER must be set")
-
- // Parse the properties file for the equivalent spark.driver.* configs
- val properties = Utils.getPropertiesFromFile(propertiesFile)
- val confDriverMemory = properties.get("spark.driver.memory")
- val confLibraryPath = properties.get("spark.driver.extraLibraryPath")
- val confClasspath = properties.get("spark.driver.extraClassPath")
- val confJavaOpts = properties.get("spark.driver.extraJavaOptions")
-
- // Favor Spark submit arguments over the equivalent configs in the properties file.
- // Note that we do not actually use the Spark submit values for library path, classpath,
- // and Java opts here, because we have already captured them in Bash.
-
- val newDriverMemory = submitDriverMemory
- .orElse(confDriverMemory)
- .getOrElse(defaultDriverMemory)
-
- val newClasspath =
- if (submitClasspath.isDefined) {
- classpath
- } else {
- classpath + confClasspath.map(sys.props("path.separator") + _).getOrElse("")
- }
-
- val newJavaOpts =
- if (submitJavaOpts.isDefined) {
- // SPARK_SUBMIT_OPTS is already captured in JAVA_OPTS
- javaOpts
- } else {
- javaOpts + confJavaOpts.map(" " + _).getOrElse("")
- }
-
- val filteredJavaOpts = Utils.splitCommandString(newJavaOpts)
- .filterNot(_.startsWith("-Xms"))
- .filterNot(_.startsWith("-Xmx"))
-
- // Build up command
- val command: Seq[String] =
- Seq(runner) ++
- Seq("-cp", newClasspath) ++
- filteredJavaOpts ++
- Seq(s"-Xms$newDriverMemory", s"-Xmx$newDriverMemory") ++
- Seq("org.apache.spark.deploy.SparkSubmit") ++
- submitArgs
-
- // Print the launch command. This follows closely the format used in `bin/spark-class`.
- if (sys.env.contains("SPARK_PRINT_LAUNCH_COMMAND")) {
- System.err.print("Spark Command: ")
- System.err.println(command.mkString(" "))
- System.err.println("========================================\n")
- }
-
- // Start the driver JVM
- val filteredCommand = command.filter(_.nonEmpty)
- val builder = new ProcessBuilder(filteredCommand)
- val env = builder.environment()
-
- if (submitLibraryPath.isEmpty && confLibraryPath.nonEmpty) {
- val libraryPaths = confLibraryPath ++ sys.env.get(Utils.libraryPathEnvName)
- env.put(Utils.libraryPathEnvName, libraryPaths.mkString(sys.props("path.separator")))
- }
-
- val process = builder.start()
-
- // If we kill an app while it's running, its sub-process should be killed too.
- Runtime.getRuntime().addShutdownHook(new Thread() {
- override def run() = {
- if (process != null) {
- process.destroy()
- process.waitFor()
- }
- }
- })
-
- // Redirect stdout and stderr from the child JVM
- val stdoutThread = new RedirectThread(process.getInputStream, System.out, "redirect stdout")
- val stderrThread = new RedirectThread(process.getErrorStream, System.err, "redirect stderr")
- stdoutThread.start()
- stderrThread.start()
-
- // Redirect stdin to child JVM only if we're not running Windows. This is because the
- // subprocess there already reads directly from our stdin, so we should avoid spawning a
- // thread that contends with the subprocess in reading from System.in.
- val isWindows = Utils.isWindows
- val isSubprocess = sys.env.contains("IS_SUBPROCESS")
- if (!isWindows) {
- val stdinThread = new RedirectThread(System.in, process.getOutputStream, "redirect stdin",
- propagateEof = true)
- stdinThread.start()
- // Spark submit (JVM) may run as a subprocess, and so this JVM should terminate on
- // broken pipe, signaling that the parent process has exited. This is the case if the
- // application is launched directly from python, as in the PySpark shell. In Windows,
- // the termination logic is handled in java_gateway.py
- if (isSubprocess) {
- stdinThread.join()
- process.destroy()
- }
- }
- val returnCode = process.waitFor()
- stdoutThread.join()
- stderrThread.join()
- sys.exit(returnCode)
- }
-
-}
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala b/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala
index 3e013c3209..83f78cf473 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala
@@ -20,10 +20,12 @@ package org.apache.spark.deploy.worker
import java.io.{File, FileOutputStream, InputStream, IOException}
import java.lang.System._
+import scala.collection.JavaConversions._
import scala.collection.Map
import org.apache.spark.Logging
import org.apache.spark.deploy.Command
+import org.apache.spark.launcher.WorkerCommandBuilder
import org.apache.spark.util.Utils
/**
@@ -54,12 +56,10 @@ object CommandUtils extends Logging {
}
private def buildCommandSeq(command: Command, memory: Int, sparkHome: String): Seq[String] = {
- val runner = sys.env.get("JAVA_HOME").map(_ + "/bin/java").getOrElse("java")
-
// SPARK-698: do not call the run.cmd script, as process.destroy()
// fails to kill a process tree on Windows
- Seq(runner) ++ buildJavaOpts(command, memory, sparkHome) ++ Seq(command.mainClass) ++
- command.arguments
+ val cmd = new WorkerCommandBuilder(sparkHome, memory, command).buildCommand()
+ cmd.toSeq ++ Seq(command.mainClass) ++ command.arguments
}
/**
@@ -92,44 +92,6 @@ object CommandUtils extends Logging {
command.javaOpts)
}
- /**
- * Attention: this must always be aligned with the environment variables in the run scripts and
- * the way the JAVA_OPTS are assembled there.
- */
- private def buildJavaOpts(command: Command, memory: Int, sparkHome: String): Seq[String] = {
- val memoryOpts = Seq(s"-Xms${memory}M", s"-Xmx${memory}M")
-
- // Exists for backwards compatibility with older Spark versions
- val workerLocalOpts = Option(getenv("SPARK_JAVA_OPTS")).map(Utils.splitCommandString)
- .getOrElse(Nil)
- if (workerLocalOpts.length > 0) {
- logWarning("SPARK_JAVA_OPTS was set on the worker. It is deprecated in Spark 1.0.")
- logWarning("Set SPARK_LOCAL_DIRS for node-specific storage locations.")
- }
-
- // Figure out our classpath with the external compute-classpath script
- val ext = if (System.getProperty("os.name").startsWith("Windows")) ".cmd" else ".sh"
- val classPath = Utils.executeAndGetOutput(
- Seq(sparkHome + "/bin/compute-classpath" + ext),
- extraEnvironment = command.environment)
- val userClassPath = command.classPathEntries ++ Seq(classPath)
-
- val javaVersion = System.getProperty("java.version")
-
- val javaOpts = workerLocalOpts ++ command.javaOpts
-
- val permGenOpt =
- if (!javaVersion.startsWith("1.8") && !javaOpts.exists(_.startsWith("-XX:MaxPermSize="))) {
- // do not specify -XX:MaxPermSize if it was already specified by user
- Some("-XX:MaxPermSize=128m")
- } else {
- None
- }
-
- Seq("-cp", userClassPath.filterNot(_.isEmpty).mkString(File.pathSeparator)) ++
- permGenOpt ++ javaOpts ++ memoryOpts
- }
-
/** Spawn a thread that will redirect a given stream to a file */
def redirectStream(in: InputStream, file: File) {
val out = new FileOutputStream(file, true)
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index bed0a08d4d..a897e53218 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -49,7 +49,6 @@ private[spark] class Executor(
isLocal: Boolean = false)
extends Logging
{
-
logInfo(s"Starting executor ID $executorId on host $executorHostname")
// Application dependencies (added through SparkContext) that we've fetched so far on this node.
diff --git a/core/src/main/scala/org/apache/spark/launcher/SparkSubmitArgumentsParser.scala b/core/src/main/scala/org/apache/spark/launcher/SparkSubmitArgumentsParser.scala
new file mode 100644
index 0000000000..a835012531
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/launcher/SparkSubmitArgumentsParser.scala
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.launcher
+
+/**
+ * This class makes SparkSubmitOptionParser visible for Spark code outside of the `launcher`
+ * package, since Java doesn't have a feature similar to `private[spark]`, and we don't want
+ * that class to be public.
+ */
+private[spark] abstract class SparkSubmitArgumentsParser extends SparkSubmitOptionParser
diff --git a/core/src/main/scala/org/apache/spark/launcher/WorkerCommandBuilder.scala b/core/src/main/scala/org/apache/spark/launcher/WorkerCommandBuilder.scala
new file mode 100644
index 0000000000..9be98723ae
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/launcher/WorkerCommandBuilder.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.launcher
+
+import java.io.File
+import java.util.{HashMap => JHashMap, List => JList, Map => JMap}
+
+import scala.collection.JavaConversions._
+
+import org.apache.spark.deploy.Command
+
+/**
+ * This class is used by CommandUtils. It uses some package-private APIs in SparkLauncher, and since
+ * Java doesn't have a feature similar to `private[spark]`, and we don't want that class to be
+ * public, needs to live in the same package as the rest of the library.
+ */
+private[spark] class WorkerCommandBuilder(sparkHome: String, memoryMb: Int, command: Command)
+ extends AbstractCommandBuilder {
+
+ childEnv.putAll(command.environment)
+ childEnv.put(CommandBuilderUtils.ENV_SPARK_HOME, sparkHome)
+
+ override def buildCommand(env: JMap[String, String]): JList[String] = {
+ val cmd = buildJavaCommand(command.classPathEntries.mkString(File.pathSeparator))
+ cmd.add(s"-Xms${memoryMb}M")
+ cmd.add(s"-Xmx${memoryMb}M")
+ command.javaOpts.foreach(cmd.add)
+ addPermGenSizeOpt(cmd)
+ addOptionString(cmd, getenv("SPARK_JAVA_OPTS"))
+ cmd
+ }
+
+ def buildCommand(): JList[String] = buildCommand(new JHashMap[String, String]())
+
+}
diff --git a/docs/programming-guide.md b/docs/programming-guide.md
index b5e04bd0c6..fa0b4e3705 100644
--- a/docs/programming-guide.md
+++ b/docs/programming-guide.md
@@ -1369,6 +1369,11 @@ The [application submission guide](submitting-applications.html) describes how t
In short, once you package your application into a JAR (for Java/Scala) or a set of `.py` or `.zip` files (for Python),
the `bin/spark-submit` script lets you submit it to any supported cluster manager.
+# Launching Spark jobs from Java / Scala
+
+The [org.apache.spark.launcher](api/java/index.html?org/apache/spark/launcher/package-summary.html)
+package provides classes for launching Spark jobs as child processes using a simple Java API.
+
# Unit Testing
Spark is friendly to unit testing with any popular unit test framework.
diff --git a/launcher/pom.xml b/launcher/pom.xml
new file mode 100644
index 0000000000..ccbd9d0419
--- /dev/null
+++ b/launcher/pom.xml
@@ -0,0 +1,83 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-parent_2.10</artifactId>
+ <version>1.3.0-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-launcher_2.10</artifactId>
+ <packaging>jar</packaging>
+ <name>Spark Launcher Project</name>
+ <url>http://spark.apache.org/</url>
+ <properties>
+ <sbt.project.name>launcher</sbt.project.name>
+ </properties>
+
+ <dependencies>
+ <!-- NOTE: only test-scope dependencies are allowed in this module. -->
+ <dependency>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.scalatest</groupId>
+ <artifactId>scalatest_${scala.binary.version}</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <!-- Not needed by the test code, but referenced by SparkSubmit which is used by the tests. -->
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
+ <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
+ </build>
+</project>
diff --git a/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java b/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java
new file mode 100644
index 0000000000..dc90e9e987
--- /dev/null
+++ b/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java
@@ -0,0 +1,362 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.launcher;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileFilter;
+import java.io.FileInputStream;
+import java.io.InputStreamReader;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.jar.JarFile;
+import java.util.regex.Pattern;
+
+import static org.apache.spark.launcher.CommandBuilderUtils.*;
+
+/**
+ * Abstract Spark command builder that defines common functionality.
+ */
+abstract class AbstractCommandBuilder {
+
+ boolean verbose;
+ String appName;
+ String appResource;
+ String deployMode;
+ String javaHome;
+ String mainClass;
+ String master;
+ String propertiesFile;
+ final List<String> appArgs;
+ final List<String> jars;
+ final List<String> files;
+ final List<String> pyFiles;
+ final Map<String, String> childEnv;
+ final Map<String, String> conf;
+
+ public AbstractCommandBuilder() {
+ this.appArgs = new ArrayList<String>();
+ this.childEnv = new HashMap<String, String>();
+ this.conf = new HashMap<String, String>();
+ this.files = new ArrayList<String>();
+ this.jars = new ArrayList<String>();
+ this.pyFiles = new ArrayList<String>();
+ }
+
+ /**
+ * Builds the command to execute.
+ *
+ * @param env A map containing environment variables for the child process. It may already contain
+ * entries defined by the user (such as SPARK_HOME, or those defined by the
+ * SparkLauncher constructor that takes an environment), and may be modified to
+ * include other variables needed by the process to be executed.
+ */
+ abstract List<String> buildCommand(Map<String, String> env) throws IOException;
+
+ /**
+ * Builds a list of arguments to run java.
+ *
+ * This method finds the java executable to use and appends JVM-specific options for running a
+ * class with Spark in the classpath. It also loads options from the "java-opts" file in the
+ * configuration directory being used.
+ *
+ * Callers should still add at least the class to run, as well as any arguments to pass to the
+ * class.
+ */
+ List<String> buildJavaCommand(String extraClassPath) throws IOException {
+ List<String> cmd = new ArrayList<String>();
+ if (javaHome == null) {
+ cmd.add(join(File.separator, System.getProperty("java.home"), "bin", "java"));
+ } else {
+ cmd.add(join(File.separator, javaHome, "bin", "java"));
+ }
+
+ // Load extra JAVA_OPTS from conf/java-opts, if it exists.
+ File javaOpts = new File(join(File.separator, getConfDir(), "java-opts"));
+ if (javaOpts.isFile()) {
+ BufferedReader br = new BufferedReader(new InputStreamReader(
+ new FileInputStream(javaOpts), "UTF-8"));
+ try {
+ String line;
+ while ((line = br.readLine()) != null) {
+ addOptionString(cmd, line);
+ }
+ } finally {
+ br.close();
+ }
+ }
+
+ cmd.add("-cp");
+ cmd.add(join(File.pathSeparator, buildClassPath(extraClassPath)));
+ return cmd;
+ }
+
+ /**
+ * Adds the default perm gen size option for Spark if the VM requires it and the user hasn't
+ * set it.
+ */
+ void addPermGenSizeOpt(List<String> cmd) {
+ // Don't set MaxPermSize for Java 8 and later.
+ String[] version = System.getProperty("java.version").split("\\.");
+ if (Integer.parseInt(version[0]) > 1 || Integer.parseInt(version[1]) > 7) {
+ return;
+ }
+
+ for (String arg : cmd) {
+ if (arg.startsWith("-XX:MaxPermSize=")) {
+ return;
+ }
+ }
+
+ cmd.add("-XX:MaxPermSize=128m");
+ }
+
+ void addOptionString(List<String> cmd, String options) {
+ if (!isEmpty(options)) {
+ for (String opt : parseOptionString(options)) {
+ cmd.add(opt);
+ }
+ }
+ }
+
+ /**
+ * Builds the classpath for the application. Returns a list with one classpath entry per element;
+ * each entry is formatted in the way expected by <i>java.net.URLClassLoader</i> (more
+ * specifically, with trailing slashes for directories).
+ */
+ List<String> buildClassPath(String appClassPath) throws IOException {
+ String sparkHome = getSparkHome();
+ String scala = getScalaVersion();
+
+ List<String> cp = new ArrayList<String>();
+ addToClassPath(cp, getenv("SPARK_CLASSPATH"));
+ addToClassPath(cp, appClassPath);
+
+ addToClassPath(cp, getConfDir());
+
+ boolean prependClasses = !isEmpty(getenv("SPARK_PREPEND_CLASSES"));
+ boolean isTesting = "1".equals(getenv("SPARK_TESTING"));
+ if (prependClasses || isTesting) {
+ List<String> projects = Arrays.asList("core", "repl", "mllib", "bagel", "graphx",
+ "streaming", "tools", "sql/catalyst", "sql/core", "sql/hive", "sql/hive-thriftserver",
+ "yarn", "launcher");
+ if (prependClasses) {
+ System.err.println(
+ "NOTE: SPARK_PREPEND_CLASSES is set, placing locally compiled Spark classes ahead of " +
+ "assembly.");
+ for (String project : projects) {
+ addToClassPath(cp, String.format("%s/%s/target/scala-%s/classes", sparkHome, project,
+ scala));
+ }
+ }
+ if (isTesting) {
+ for (String project : projects) {
+ addToClassPath(cp, String.format("%s/%s/target/scala-%s/test-classes", sparkHome,
+ project, scala));
+ }
+ }
+
+ // Add this path to include jars that are shaded in the final deliverable created during
+ // the maven build. These jars are copied to this directory during the build.
+ addToClassPath(cp, String.format("%s/core/target/jars/*", sparkHome));
+ }
+
+ String assembly = findAssembly(scala);
+ addToClassPath(cp, assembly);
+
+ // When Hive support is needed, Datanucleus jars must be included on the classpath. Datanucleus
+ // jars do not work if only included in the uber jar as plugin.xml metadata is lost. Both sbt
+ // and maven will populate "lib_managed/jars/" with the datanucleus jars when Spark is built
+ // with Hive, so first check if the datanucleus jars exist, and then ensure the current Spark
+ // assembly is built for Hive, before actually populating the CLASSPATH with the jars.
+ //
+ // This block also serves as a check for SPARK-1703, when the assembly jar is built with
+ // Java 7 and ends up with too many files, causing issues with other JDK versions.
+ boolean needsDataNucleus = false;
+ JarFile assemblyJar = null;
+ try {
+ assemblyJar = new JarFile(assembly);
+ needsDataNucleus = assemblyJar.getEntry("org/apache/hadoop/hive/ql/exec/") != null;
+ } catch (IOException ioe) {
+ if (ioe.getMessage().indexOf("invalid CEN header") >= 0) {
+ System.err.println(
+ "Loading Spark jar failed.\n" +
+ "This is likely because Spark was compiled with Java 7 and run\n" +
+ "with Java 6 (see SPARK-1703). Please use Java 7 to run Spark\n" +
+ "or build Spark with Java 6.");
+ System.exit(1);
+ } else {
+ throw ioe;
+ }
+ } finally {
+ if (assemblyJar != null) {
+ try {
+ assemblyJar.close();
+ } catch (IOException e) {
+ // Ignore.
+ }
+ }
+ }
+
+ if (needsDataNucleus) {
+ System.err.println("Spark assembly has been built with Hive, including Datanucleus jars " +
+ "in classpath.");
+ File libdir;
+ if (new File(sparkHome, "RELEASE").isFile()) {
+ libdir = new File(sparkHome, "lib");
+ } else {
+ libdir = new File(sparkHome, "lib_managed/jars");
+ }
+
+ checkState(libdir.isDirectory(), "Library directory '%s' does not exist.",
+ libdir.getAbsolutePath());
+ for (File jar : libdir.listFiles()) {
+ if (jar.getName().startsWith("datanucleus-")) {
+ addToClassPath(cp, jar.getAbsolutePath());
+ }
+ }
+ }
+
+ addToClassPath(cp, getenv("HADOOP_CONF_DIR"));
+ addToClassPath(cp, getenv("YARN_CONF_DIR"));
+ addToClassPath(cp, getenv("SPARK_DIST_CLASSPATH"));
+ return cp;
+ }
+
+ /**
+ * Adds entries to the classpath.
+ *
+ * @param cp List to which the new entries are appended.
+ * @param entries New classpath entries (separated by File.pathSeparator).
+ */
+ private void addToClassPath(List<String> cp, String entries) {
+ if (isEmpty(entries)) {
+ return;
+ }
+ String[] split = entries.split(Pattern.quote(File.pathSeparator));
+ for (String entry : split) {
+ if (!isEmpty(entry)) {
+ if (new File(entry).isDirectory() && !entry.endsWith(File.separator)) {
+ entry += File.separator;
+ }
+ cp.add(entry);
+ }
+ }
+ }
+
+ String getScalaVersion() {
+ String scala = getenv("SPARK_SCALA_VERSION");
+ if (scala != null) {
+ return scala;
+ }
+
+ String sparkHome = getSparkHome();
+ File scala210 = new File(sparkHome, "assembly/target/scala-2.10");
+ File scala211 = new File(sparkHome, "assembly/target/scala-2.11");
+ checkState(!scala210.isDirectory() || !scala211.isDirectory(),
+ "Presence of build for both scala versions (2.10 and 2.11) detected.\n" +
+ "Either clean one of them or set SPARK_SCALA_VERSION in your environment.");
+ if (scala210.isDirectory()) {
+ return "2.10";
+ } else {
+ checkState(scala211.isDirectory(), "Cannot find any assembly build directories.");
+ return "2.11";
+ }
+ }
+
+ String getSparkHome() {
+ String path = getenv(ENV_SPARK_HOME);
+ checkState(path != null,
+ "Spark home not found; set it explicitly or use the SPARK_HOME environment variable.");
+ return path;
+ }
+
+ /**
+ * Loads the configuration file for the application, if it exists. This is either the
+ * user-specified properties file, or the spark-defaults.conf file under the Spark configuration
+ * directory.
+ */
+ Properties loadPropertiesFile() throws IOException {
+ Properties props = new Properties();
+ File propsFile;
+ if (propertiesFile != null) {
+ propsFile = new File(propertiesFile);
+ checkArgument(propsFile.isFile(), "Invalid properties file '%s'.", propertiesFile);
+ } else {
+ propsFile = new File(getConfDir(), DEFAULT_PROPERTIES_FILE);
+ }
+
+ if (propsFile.isFile()) {
+ FileInputStream fd = null;
+ try {
+ fd = new FileInputStream(propsFile);
+ props.load(new InputStreamReader(fd, "UTF-8"));
+ } finally {
+ if (fd != null) {
+ try {
+ fd.close();
+ } catch (IOException e) {
+ // Ignore.
+ }
+ }
+ }
+ }
+
+ return props;
+ }
+
+ String getenv(String key) {
+ return firstNonEmpty(childEnv.get(key), System.getenv(key));
+ }
+
+ private String findAssembly(String scalaVersion) {
+ String sparkHome = getSparkHome();
+ File libdir;
+ if (new File(sparkHome, "RELEASE").isFile()) {
+ libdir = new File(sparkHome, "lib");
+ checkState(libdir.isDirectory(), "Library directory '%s' does not exist.",
+ libdir.getAbsolutePath());
+ } else {
+ libdir = new File(sparkHome, String.format("assembly/target/scala-%s", scalaVersion));
+ }
+
+ final Pattern re = Pattern.compile("spark-assembly.*hadoop.*\\.jar");
+ FileFilter filter = new FileFilter() {
+ @Override
+ public boolean accept(File file) {
+ return file.isFile() && re.matcher(file.getName()).matches();
+ }
+ };
+ File[] assemblies = libdir.listFiles(filter);
+ checkState(assemblies != null && assemblies.length > 0, "No assemblies found in '%s'.", libdir);
+ checkState(assemblies.length == 1, "Multiple assemblies found in '%s'.", libdir);
+ return assemblies[0].getAbsolutePath();
+ }
+
+ private String getConfDir() {
+ String confDir = getenv("SPARK_CONF_DIR");
+ return confDir != null ? confDir : join(File.separator, getSparkHome(), "conf");
+ }
+
+}
diff --git a/launcher/src/main/java/org/apache/spark/launcher/CommandBuilderUtils.java b/launcher/src/main/java/org/apache/spark/launcher/CommandBuilderUtils.java
new file mode 100644
index 0000000000..9b04732afe
--- /dev/null
+++ b/launcher/src/main/java/org/apache/spark/launcher/CommandBuilderUtils.java
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.launcher;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Helper methods for command builders.
+ */
+class CommandBuilderUtils {
+
+ static final String DEFAULT_MEM = "512m";
+ static final String DEFAULT_PROPERTIES_FILE = "spark-defaults.conf";
+ static final String ENV_SPARK_HOME = "SPARK_HOME";
+
+ /** Returns whether the given string is null or empty. */
+ static boolean isEmpty(String s) {
+ return s == null || s.isEmpty();
+ }
+
+ /** Joins a list of strings using the given separator. */
+ static String join(String sep, String... elements) {
+ StringBuilder sb = new StringBuilder();
+ for (String e : elements) {
+ if (e != null) {
+ if (sb.length() > 0) {
+ sb.append(sep);
+ }
+ sb.append(e);
+ }
+ }
+ return sb.toString();
+ }
+
+ /** Joins a list of strings using the given separator. */
+ static String join(String sep, Iterable<String> elements) {
+ StringBuilder sb = new StringBuilder();
+ for (String e : elements) {
+ if (e != null) {
+ if (sb.length() > 0) {
+ sb.append(sep);
+ }
+ sb.append(e);
+ }
+ }
+ return sb.toString();
+ }
+
+ /**
+ * Returns the first non-empty value mapped to the given key in the given maps, or null otherwise.
+ */
+ static String firstNonEmptyValue(String key, Map<?, ?>... maps) {
+ for (Map<?, ?> map : maps) {
+ String value = (String) map.get(key);
+ if (!isEmpty(value)) {
+ return value;
+ }
+ }
+ return null;
+ }
+
+ /** Returns the first non-empty, non-null string in the given list, or null otherwise. */
+ static String firstNonEmpty(String... candidates) {
+ for (String s : candidates) {
+ if (!isEmpty(s)) {
+ return s;
+ }
+ }
+ return null;
+ }
+
+ /** Returns the name of the env variable that holds the native library path. */
+ static String getLibPathEnvName() {
+ if (isWindows()) {
+ return "PATH";
+ }
+
+ String os = System.getProperty("os.name");
+ if (os.startsWith("Mac OS X")) {
+ return "DYLD_LIBRARY_PATH";
+ } else {
+ return "LD_LIBRARY_PATH";
+ }
+ }
+
+ /** Returns whether the OS is Windows. */
+ static boolean isWindows() {
+ String os = System.getProperty("os.name");
+ return os.startsWith("Windows");
+ }
+
+ /**
+ * Updates the user environment, appending the given pathList to the existing value of the given
+ * environment variable (or setting it if it hasn't yet been set).
+ */
+ static void mergeEnvPathList(Map<String, String> userEnv, String envKey, String pathList) {
+ if (!isEmpty(pathList)) {
+ String current = firstNonEmpty(userEnv.get(envKey), System.getenv(envKey));
+ userEnv.put(envKey, join(File.pathSeparator, current, pathList));
+ }
+ }
+
+ /**
+ * Parse a string as if it were a list of arguments, following bash semantics.
+ * For example:
+ *
+ * Input: "\"ab cd\" efgh 'i \" j'"
+ * Output: [ "ab cd", "efgh", "i \" j" ]
+ */
+ static List<String> parseOptionString(String s) {
+ List<String> opts = new ArrayList<String>();
+ StringBuilder opt = new StringBuilder();
+ boolean inOpt = false;
+ boolean inSingleQuote = false;
+ boolean inDoubleQuote = false;
+ boolean escapeNext = false;
+
+ // This is needed to detect when a quoted empty string is used as an argument ("" or '').
+ boolean hasData = false;
+
+ for (int i = 0; i < s.length(); i++) {
+ int c = s.codePointAt(i);
+ if (escapeNext) {
+ opt.appendCodePoint(c);
+ escapeNext = false;
+ } else if (inOpt) {
+ switch (c) {
+ case '\\':
+ if (inSingleQuote) {
+ opt.appendCodePoint(c);
+ } else {
+ escapeNext = true;
+ }
+ break;
+ case '\'':
+ if (inDoubleQuote) {
+ opt.appendCodePoint(c);
+ } else {
+ inSingleQuote = !inSingleQuote;
+ }
+ break;
+ case '"':
+ if (inSingleQuote) {
+ opt.appendCodePoint(c);
+ } else {
+ inDoubleQuote = !inDoubleQuote;
+ }
+ break;
+ default:
+ if (!Character.isWhitespace(c) || inSingleQuote || inDoubleQuote) {
+ opt.appendCodePoint(c);
+ } else {
+ opts.add(opt.toString());
+ opt.setLength(0);
+ inOpt = false;
+ hasData = false;
+ }
+ }
+ } else {
+ switch (c) {
+ case '\'':
+ inSingleQuote = true;
+ inOpt = true;
+ hasData = true;
+ break;
+ case '"':
+ inDoubleQuote = true;
+ inOpt = true;
+ hasData = true;
+ break;
+ case '\\':
+ escapeNext = true;
+ inOpt = true;
+ hasData = true;
+ break;
+ default:
+ if (!Character.isWhitespace(c)) {
+ inOpt = true;
+ hasData = true;
+ opt.appendCodePoint(c);
+ }
+ }
+ }
+ }
+
+ checkArgument(!inSingleQuote && !inDoubleQuote && !escapeNext, "Invalid option string: %s", s);
+ if (hasData) {
+ opts.add(opt.toString());
+ }
+ return opts;
+ }
+
+ /** Throws IllegalArgumentException if the given object is null. */
+ static void checkNotNull(Object o, String arg) {
+ if (o == null) {
+ throw new IllegalArgumentException(String.format("'%s' must not be null.", arg));
+ }
+ }
+
+ /** Throws IllegalArgumentException with the given message if the check is false. */
+ static void checkArgument(boolean check, String msg, Object... args) {
+ if (!check) {
+ throw new IllegalArgumentException(String.format(msg, args));
+ }
+ }
+
+ /** Throws IllegalStateException with the given message if the check is false. */
+ static void checkState(boolean check, String msg, Object... args) {
+ if (!check) {
+ throw new IllegalStateException(String.format(msg, args));
+ }
+ }
+
+ /**
+ * Quote a command argument for a command to be run by a Windows batch script, if the argument
+ * needs quoting. Arguments only seem to need quotes in batch scripts if they have certain
+ * special characters, some of which need extra (and different) escaping.
+ *
+ * For example:
+ * original single argument: ab="cde fgh"
+ * quoted: "ab^=""cde fgh"""
+ */
+ static String quoteForBatchScript(String arg) {
+
+ boolean needsQuotes = false;
+ for (int i = 0; i < arg.length(); i++) {
+ int c = arg.codePointAt(i);
+ if (Character.isWhitespace(c) || c == '"' || c == '=') {
+ needsQuotes = true;
+ break;
+ }
+ }
+ if (!needsQuotes) {
+ return arg;
+ }
+ StringBuilder quoted = new StringBuilder();
+ quoted.append("\"");
+ for (int i = 0; i < arg.length(); i++) {
+ int cp = arg.codePointAt(i);
+ switch (cp) {
+ case '"':
+ quoted.append('"');
+ break;
+
+ case '=':
+ quoted.append('^');
+ break;
+
+ default:
+ break;
+ }
+ quoted.appendCodePoint(cp);
+ }
+ quoted.append("\"");
+ return quoted.toString();
+ }
+
+ /**
+ * Quotes a string so that it can be used in a command string and be parsed back into a single
+ * argument by python's "shlex.split()" function.
+ *
+ * Basically, just add simple escapes. E.g.:
+ * original single argument : ab "cd" ef
+ * after: "ab \"cd\" ef"
+ */
+ static String quoteForPython(String s) {
+ StringBuilder quoted = new StringBuilder().append('"');
+ for (int i = 0; i < s.length(); i++) {
+ int cp = s.codePointAt(i);
+ if (cp == '"' || cp == '\\') {
+ quoted.appendCodePoint('\\');
+ }
+ quoted.appendCodePoint(cp);
+ }
+ return quoted.append('"').toString();
+ }
+
+}
diff --git a/launcher/src/main/java/org/apache/spark/launcher/Main.java b/launcher/src/main/java/org/apache/spark/launcher/Main.java
new file mode 100644
index 0000000000..206acfb514
--- /dev/null
+++ b/launcher/src/main/java/org/apache/spark/launcher/Main.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.launcher;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.spark.launcher.CommandBuilderUtils.*;
+
+/**
+ * Command line interface for the Spark launcher. Used internally by Spark scripts.
+ */
+class Main {
+
+ /**
+ * Usage: Main [class] [class args]
+ * <p/>
+ * This CLI works in two different modes:
+ * <ul>
+ * <li>"spark-submit": if <i>class</i> is "org.apache.spark.deploy.SparkSubmit", the
+ * {@link SparkLauncher} class is used to launch a Spark application.</li>
+ * <li>"spark-class": if another class is provided, an internal Spark class is run.</li>
+ * </ul>
+ *
+ * This class works in tandem with the "bin/spark-class" script on Unix-like systems, and
+ * "bin/spark-class2.cmd" batch script on Windows to execute the final command.
+ * <p/>
+ * On Unix-like systems, the output is a list of command arguments, separated by the NULL
+ * character. On Windows, the output is a command line suitable for direct execution from the
+ * script.
+ */
+ public static void main(String[] argsArray) throws Exception {
+ checkArgument(argsArray.length > 0, "Not enough arguments: missing class name.");
+
+ List<String> args = new ArrayList<String>(Arrays.asList(argsArray));
+ String className = args.remove(0);
+
+ boolean printLaunchCommand;
+ boolean printUsage;
+ AbstractCommandBuilder builder;
+ try {
+ if (className.equals("org.apache.spark.deploy.SparkSubmit")) {
+ builder = new SparkSubmitCommandBuilder(args);
+ } else {
+ builder = new SparkClassCommandBuilder(className, args);
+ }
+ printLaunchCommand = !isEmpty(System.getenv("SPARK_PRINT_LAUNCH_COMMAND"));
+ printUsage = false;
+ } catch (IllegalArgumentException e) {
+ builder = new UsageCommandBuilder(e.getMessage());
+ printLaunchCommand = false;
+ printUsage = true;
+ }
+
+ Map<String, String> env = new HashMap<String, String>();
+ List<String> cmd = builder.buildCommand(env);
+ if (printLaunchCommand) {
+ System.err.println("Spark Command: " + join(" ", cmd));
+ System.err.println("========================================");
+ }
+
+ if (isWindows()) {
+ // When printing the usage message, we can't use "cmd /v" since that prevents the env
+ // variable from being seen in the caller script. So do not call prepareWindowsCommand().
+ if (printUsage) {
+ System.out.println(join(" ", cmd));
+ } else {
+ System.out.println(prepareWindowsCommand(cmd, env));
+ }
+ } else {
+ // In bash, use NULL as the arg separator since it cannot be used in an argument.
+ List<String> bashCmd = prepareBashCommand(cmd, env);
+ for (String c : bashCmd) {
+ System.out.print(c);
+ System.out.print('\0');
+ }
+ }
+ }
+
+ /**
+ * Prepare a command line for execution from a Windows batch script.
+ *
+ * The method quotes all arguments so that spaces are handled as expected. Quotes within arguments
+ * are "double quoted" (which is batch for escaping a quote). This page has more details about
+ * quoting and other batch script fun stuff: http://ss64.com/nt/syntax-esc.html
+ *
+ * The command is executed using "cmd /c" and formatted in single line, since that's the
+ * easiest way to consume this from a batch script (see spark-class2.cmd).
+ */
+ private static String prepareWindowsCommand(List<String> cmd, Map<String, String> childEnv) {
+ StringBuilder cmdline = new StringBuilder("cmd /c \"");
+ for (Map.Entry<String, String> e : childEnv.entrySet()) {
+ cmdline.append(String.format("set %s=%s", e.getKey(), e.getValue()));
+ cmdline.append(" && ");
+ }
+ for (String arg : cmd) {
+ cmdline.append(quoteForBatchScript(arg));
+ cmdline.append(" ");
+ }
+ cmdline.append("\"");
+ return cmdline.toString();
+ }
+
+ /**
+ * Prepare the command for execution from a bash script. The final command will have commands to
+ * set up any needed environment variables needed by the child process.
+ */
+ private static List<String> prepareBashCommand(List<String> cmd, Map<String, String> childEnv) {
+ if (childEnv.isEmpty()) {
+ return cmd;
+ }
+
+ List<String> newCmd = new ArrayList<String>();
+ newCmd.add("env");
+
+ for (Map.Entry<String, String> e : childEnv.entrySet()) {
+ newCmd.add(String.format("%s=%s", e.getKey(), e.getValue()));
+ }
+ newCmd.addAll(cmd);
+ return newCmd;
+ }
+
+ /**
+ * Internal builder used when command line parsing fails. This will behave differently depending
+ * on the platform:
+ *
+ * - On Unix-like systems, it will print a call to the "usage" function with two arguments: the
+ * the error string, and the exit code to use. The function is expected to print the command's
+ * usage and exit with the provided exit code. The script should use "export -f usage" after
+ * declaring a function called "usage", so that the function is available to downstream scripts.
+ *
+ * - On Windows it will set the variable "SPARK_LAUNCHER_USAGE_ERROR" to the usage error message.
+ * The batch script should check for this variable and print its usage, since batch scripts
+ * don't really support the "export -f" functionality used in bash.
+ */
+ private static class UsageCommandBuilder extends AbstractCommandBuilder {
+
+ private final String message;
+
+ UsageCommandBuilder(String message) {
+ this.message = message;
+ }
+
+ @Override
+ public List<String> buildCommand(Map<String, String> env) {
+ if (isWindows()) {
+ return Arrays.asList("set", "SPARK_LAUNCHER_USAGE_ERROR=" + message);
+ } else {
+ return Arrays.asList("usage", message, "1");
+ }
+ }
+
+ }
+
+}
diff --git a/launcher/src/main/java/org/apache/spark/launcher/SparkClassCommandBuilder.java b/launcher/src/main/java/org/apache/spark/launcher/SparkClassCommandBuilder.java
new file mode 100644
index 0000000000..e601a0a19f
--- /dev/null
+++ b/launcher/src/main/java/org/apache/spark/launcher/SparkClassCommandBuilder.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.launcher;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Pattern;
+
+import static org.apache.spark.launcher.CommandBuilderUtils.*;
+
+/**
+ * Command builder for internal Spark classes.
+ * <p/>
+ * This class handles building the command to launch all internal Spark classes except for
+ * SparkSubmit (which is handled by {@link SparkSubmitCommandBuilder} class.
+ */
+class SparkClassCommandBuilder extends AbstractCommandBuilder {
+
+ private final String className;
+ private final List<String> classArgs;
+
+ SparkClassCommandBuilder(String className, List<String> classArgs) {
+ this.className = className;
+ this.classArgs = classArgs;
+ }
+
+ @Override
+ public List<String> buildCommand(Map<String, String> env) throws IOException {
+ List<String> javaOptsKeys = new ArrayList<String>();
+ String memKey = null;
+ String extraClassPath = null;
+
+ // Master, Worker, and HistoryServer use SPARK_DAEMON_JAVA_OPTS (and specific opts) +
+ // SPARK_DAEMON_MEMORY.
+ if (className.equals("org.apache.spark.deploy.master.Master")) {
+ javaOptsKeys.add("SPARK_DAEMON_JAVA_OPTS");
+ javaOptsKeys.add("SPARK_MASTER_OPTS");
+ memKey = "SPARK_DAEMON_MEMORY";
+ } else if (className.equals("org.apache.spark.deploy.worker.Worker")) {
+ javaOptsKeys.add("SPARK_DAEMON_JAVA_OPTS");
+ javaOptsKeys.add("SPARK_WORKER_OPTS");
+ memKey = "SPARK_DAEMON_MEMORY";
+ } else if (className.equals("org.apache.spark.deploy.history.HistoryServer")) {
+ javaOptsKeys.add("SPARK_DAEMON_JAVA_OPTS");
+ javaOptsKeys.add("SPARK_HISTORY_OPTS");
+ memKey = "SPARK_DAEMON_MEMORY";
+ } else if (className.equals("org.apache.spark.executor.CoarseGrainedExecutorBackend")) {
+ javaOptsKeys.add("SPARK_JAVA_OPTS");
+ javaOptsKeys.add("SPARK_EXECUTOR_OPTS");
+ memKey = "SPARK_EXECUTOR_MEMORY";
+ } else if (className.equals("org.apache.spark.executor.MesosExecutorBackend")) {
+ javaOptsKeys.add("SPARK_EXECUTOR_OPTS");
+ memKey = "SPARK_EXECUTOR_MEMORY";
+ } else if (className.startsWith("org.apache.spark.tools.")) {
+ String sparkHome = getSparkHome();
+ File toolsDir = new File(join(File.separator, sparkHome, "tools", "target",
+ "scala-" + getScalaVersion()));
+ checkState(toolsDir.isDirectory(), "Cannot find tools build directory.");
+
+ Pattern re = Pattern.compile("spark-tools_.*\\.jar");
+ for (File f : toolsDir.listFiles()) {
+ if (re.matcher(f.getName()).matches()) {
+ extraClassPath = f.getAbsolutePath();
+ break;
+ }
+ }
+
+ checkState(extraClassPath != null,
+ "Failed to find Spark Tools Jar in %s.\n" +
+ "You need to run \"build/sbt tools/package\" before running %s.",
+ toolsDir.getAbsolutePath(), className);
+
+ javaOptsKeys.add("SPARK_JAVA_OPTS");
+ }
+
+ List<String> cmd = buildJavaCommand(extraClassPath);
+ for (String key : javaOptsKeys) {
+ addOptionString(cmd, System.getenv(key));
+ }
+
+ String mem = firstNonEmpty(memKey != null ? System.getenv(memKey) : null, DEFAULT_MEM);
+ cmd.add("-Xms" + mem);
+ cmd.add("-Xmx" + mem);
+ addPermGenSizeOpt(cmd);
+ cmd.add(className);
+ cmd.addAll(classArgs);
+ return cmd;
+ }
+
+}
diff --git a/launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java b/launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java
new file mode 100644
index 0000000000..b566507ee6
--- /dev/null
+++ b/launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.launcher;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.spark.launcher.CommandBuilderUtils.*;
+
+/**
+ * Launcher for Spark applications.
+ * <p/>
+ * Use this class to start Spark applications programmatically. The class uses a builder pattern
+ * to allow clients to configure the Spark application and launch it as a child process.
+ */
+public class SparkLauncher {
+
+ /** The Spark master. */
+ public static final String SPARK_MASTER = "spark.master";
+
+ /** Configuration key for the driver memory. */
+ public static final String DRIVER_MEMORY = "spark.driver.memory";
+ /** Configuration key for the driver class path. */
+ public static final String DRIVER_EXTRA_CLASSPATH = "spark.driver.extraClassPath";
+ /** Configuration key for the driver VM options. */
+ public static final String DRIVER_EXTRA_JAVA_OPTIONS = "spark.driver.extraJavaOptions";
+ /** Configuration key for the driver native library path. */
+ public static final String DRIVER_EXTRA_LIBRARY_PATH = "spark.driver.extraLibraryPath";
+
+ /** Configuration key for the executor memory. */
+ public static final String EXECUTOR_MEMORY = "spark.executor.memory";
+ /** Configuration key for the executor class path. */
+ public static final String EXECUTOR_EXTRA_CLASSPATH = "spark.executor.extraClassPath";
+ /** Configuration key for the executor VM options. */
+ public static final String EXECUTOR_EXTRA_JAVA_OPTIONS = "spark.executor.extraJavaOptions";
+ /** Configuration key for the executor native library path. */
+ public static final String EXECUTOR_EXTRA_LIBRARY_PATH = "spark.executor.extraLibraryOptions";
+ /** Configuration key for the number of executor CPU cores. */
+ public static final String EXECUTOR_CORES = "spark.executor.cores";
+
+ private final SparkSubmitCommandBuilder builder;
+
+ public SparkLauncher() {
+ this(null);
+ }
+
+ /**
+ * Creates a launcher that will set the given environment variables in the child.
+ *
+ * @param env Environment variables to set.
+ */
+ public SparkLauncher(Map<String, String> env) {
+ this.builder = new SparkSubmitCommandBuilder();
+ if (env != null) {
+ this.builder.childEnv.putAll(env);
+ }
+ }
+
+ /**
+ * Set a custom JAVA_HOME for launching the Spark application.
+ *
+ * @param javaHome Path to the JAVA_HOME to use.
+ * @return This launcher.
+ */
+ public SparkLauncher setJavaHome(String javaHome) {
+ checkNotNull(javaHome, "javaHome");
+ builder.javaHome = javaHome;
+ return this;
+ }
+
+ /**
+ * Set a custom Spark installation location for the application.
+ *
+ * @param sparkHome Path to the Spark installation to use.
+ * @return This launcher.
+ */
+ public SparkLauncher setSparkHome(String sparkHome) {
+ checkNotNull(sparkHome, "sparkHome");
+ builder.childEnv.put(ENV_SPARK_HOME, sparkHome);
+ return this;
+ }
+
+ /**
+ * Set a custom properties file with Spark configuration for the application.
+ *
+ * @param path Path to custom properties file to use.
+ * @return This launcher.
+ */
+ public SparkLauncher setPropertiesFile(String path) {
+ checkNotNull(path, "path");
+ builder.propertiesFile = path;
+ return this;
+ }
+
+ /**
+ * Set a single configuration value for the application.
+ *
+ * @param key Configuration key.
+ * @param value The value to use.
+ * @return This launcher.
+ */
+ public SparkLauncher setConf(String key, String value) {
+ checkNotNull(key, "key");
+ checkNotNull(value, "value");
+ checkArgument(key.startsWith("spark."), "'key' must start with 'spark.'");
+ builder.conf.put(key, value);
+ return this;
+ }
+
+ /**
+ * Set the application name.
+ *
+ * @param appName Application name.
+ * @return This launcher.
+ */
+ public SparkLauncher setAppName(String appName) {
+ checkNotNull(appName, "appName");
+ builder.appName = appName;
+ return this;
+ }
+
+ /**
+ * Set the Spark master for the application.
+ *
+ * @param master Spark master.
+ * @return This launcher.
+ */
+ public SparkLauncher setMaster(String master) {
+ checkNotNull(master, "master");
+ builder.master = master;
+ return this;
+ }
+
+ /**
+ * Set the deploy mode for the application.
+ *
+ * @param mode Deploy mode.
+ * @return This launcher.
+ */
+ public SparkLauncher setDeployMode(String mode) {
+ checkNotNull(mode, "mode");
+ builder.deployMode = mode;
+ return this;
+ }
+
+ /**
+ * Set the main application resource. This should be the location of a jar file for Scala/Java
+ * applications, or a python script for PySpark applications.
+ *
+ * @param resource Path to the main application resource.
+ * @return This launcher.
+ */
+ public SparkLauncher setAppResource(String resource) {
+ checkNotNull(resource, "resource");
+ builder.appResource = resource;
+ return this;
+ }
+
+ /**
+ * Sets the application class name for Java/Scala applications.
+ *
+ * @param mainClass Application's main class.
+ * @return This launcher.
+ */
+ public SparkLauncher setMainClass(String mainClass) {
+ checkNotNull(mainClass, "mainClass");
+ builder.mainClass = mainClass;
+ return this;
+ }
+
+ /**
+ * Adds command line arguments for the application.
+ *
+ * @param args Arguments to pass to the application's main class.
+ * @return This launcher.
+ */
+ public SparkLauncher addAppArgs(String... args) {
+ for (String arg : args) {
+ checkNotNull(arg, "arg");
+ builder.appArgs.add(arg);
+ }
+ return this;
+ }
+
+ /**
+ * Adds a jar file to be submitted with the application.
+ *
+ * @param jar Path to the jar file.
+ * @return This launcher.
+ */
+ public SparkLauncher addJar(String jar) {
+ checkNotNull(jar, "jar");
+ builder.jars.add(jar);
+ return this;
+ }
+
+ /**
+ * Adds a file to be submitted with the application.
+ *
+ * @param file Path to the file.
+ * @return This launcher.
+ */
+ public SparkLauncher addFile(String file) {
+ checkNotNull(file, "file");
+ builder.files.add(file);
+ return this;
+ }
+
+ /**
+ * Adds a python file / zip / egg to be submitted with the application.
+ *
+ * @param file Path to the file.
+ * @return This launcher.
+ */
+ public SparkLauncher addPyFile(String file) {
+ checkNotNull(file, "file");
+ builder.pyFiles.add(file);
+ return this;
+ }
+
+ /**
+ * Enables verbose reporting for SparkSubmit.
+ *
+ * @param verbose Whether to enable verbose output.
+ * @return This launcher.
+ */
+ public SparkLauncher setVerbose(boolean verbose) {
+ builder.verbose = verbose;
+ return this;
+ }
+
+ /**
+ * Launches a sub-process that will start the configured Spark application.
+ *
+ * @return A process handle for the Spark app.
+ */
+ public Process launch() throws IOException {
+ List<String> cmd = new ArrayList<String>();
+ String script = isWindows() ? "spark-submit.cmd" : "spark-submit";
+ cmd.add(join(File.separator, builder.getSparkHome(), "bin", script));
+ cmd.addAll(builder.buildSparkSubmitArgs());
+
+ // Since the child process is a batch script, let's quote things so that special characters are
+ // preserved, otherwise the batch interpreter will mess up the arguments. Batch scripts are
+ // weird.
+ if (isWindows()) {
+ List<String> winCmd = new ArrayList<String>();
+ for (String arg : cmd) {
+ winCmd.add(quoteForBatchScript(arg));
+ }
+ cmd = winCmd;
+ }
+
+ ProcessBuilder pb = new ProcessBuilder(cmd.toArray(new String[cmd.size()]));
+ for (Map.Entry<String, String> e : builder.childEnv.entrySet()) {
+ pb.environment().put(e.getKey(), e.getValue());
+ }
+ return pb.start();
+ }
+
+}
diff --git a/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java b/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java
new file mode 100644
index 0000000000..6ffdff63d3
--- /dev/null
+++ b/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.launcher;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.apache.spark.launcher.CommandBuilderUtils.*;
+
+/**
+ * Special command builder for handling a CLI invocation of SparkSubmit.
+ * <p/>
+ * This builder adds command line parsing compatible with SparkSubmit. It handles setting
+ * driver-side options and special parsing behavior needed for the special-casing certain internal
+ * Spark applications.
+ * <p/>
+ * This class has also some special features to aid launching pyspark.
+ */
+class SparkSubmitCommandBuilder extends AbstractCommandBuilder {
+
+ /**
+ * Name of the app resource used to identify the PySpark shell. The command line parser expects
+ * the resource name to be the very first argument to spark-submit in this case.
+ *
+ * NOTE: this cannot be "pyspark-shell" since that identifies the PySpark shell to SparkSubmit
+ * (see java_gateway.py), and can cause this code to enter into an infinite loop.
+ */
+ static final String PYSPARK_SHELL = "pyspark-shell-main";
+
+ /**
+ * This is the actual resource name that identifies the PySpark shell to SparkSubmit.
+ */
+ static final String PYSPARK_SHELL_RESOURCE = "pyspark-shell";
+
+ /**
+ * This map must match the class names for available special classes, since this modifies the way
+ * command line parsing works. This maps the class name to the resource to use when calling
+ * spark-submit.
+ */
+ private static final Map<String, String> specialClasses = new HashMap<String, String>();
+ static {
+ specialClasses.put("org.apache.spark.repl.Main", "spark-shell");
+ specialClasses.put("org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver",
+ "spark-internal");
+ specialClasses.put("org.apache.spark.sql.hive.thriftserver.HiveThriftServer2",
+ "spark-internal");
+ }
+
+ private final List<String> sparkArgs;
+
+ /**
+ * Controls whether mixing spark-submit arguments with app arguments is allowed. This is needed
+ * to parse the command lines for things like bin/spark-shell, which allows users to mix and
+ * match arguments (e.g. "bin/spark-shell SparkShellArg --master foo").
+ */
+ private boolean allowsMixedArguments;
+
+ SparkSubmitCommandBuilder() {
+ this.sparkArgs = new ArrayList<String>();
+ }
+
+ SparkSubmitCommandBuilder(List<String> args) {
+ this();
+ List<String> submitArgs = args;
+ if (args.size() > 0 && args.get(0).equals(PYSPARK_SHELL)) {
+ this.allowsMixedArguments = true;
+ appResource = PYSPARK_SHELL_RESOURCE;
+ submitArgs = args.subList(1, args.size());
+ } else {
+ this.allowsMixedArguments = false;
+ }
+
+ new OptionParser().parse(submitArgs);
+ }
+
+ @Override
+ public List<String> buildCommand(Map<String, String> env) throws IOException {
+ if (PYSPARK_SHELL_RESOURCE.equals(appResource)) {
+ return buildPySparkShellCommand(env);
+ } else {
+ return buildSparkSubmitCommand(env);
+ }
+ }
+
+ List<String> buildSparkSubmitArgs() {
+ List<String> args = new ArrayList<String>();
+ SparkSubmitOptionParser parser = new SparkSubmitOptionParser();
+
+ if (verbose) {
+ args.add(parser.VERBOSE);
+ }
+
+ if (master != null) {
+ args.add(parser.MASTER);
+ args.add(master);
+ }
+
+ if (deployMode != null) {
+ args.add(parser.DEPLOY_MODE);
+ args.add(deployMode);
+ }
+
+ if (appName != null) {
+ args.add(parser.NAME);
+ args.add(appName);
+ }
+
+ for (Map.Entry<String, String> e : conf.entrySet()) {
+ args.add(parser.CONF);
+ args.add(String.format("%s=%s", e.getKey(), e.getValue()));
+ }
+
+ if (propertiesFile != null) {
+ args.add(parser.PROPERTIES_FILE);
+ args.add(propertiesFile);
+ }
+
+ if (!jars.isEmpty()) {
+ args.add(parser.JARS);
+ args.add(join(",", jars));
+ }
+
+ if (!files.isEmpty()) {
+ args.add(parser.FILES);
+ args.add(join(",", files));
+ }
+
+ if (!pyFiles.isEmpty()) {
+ args.add(parser.PY_FILES);
+ args.add(join(",", pyFiles));
+ }
+
+ if (mainClass != null) {
+ args.add(parser.CLASS);
+ args.add(mainClass);
+ }
+
+ args.addAll(sparkArgs);
+ if (appResource != null) {
+ args.add(appResource);
+ }
+ args.addAll(appArgs);
+
+ return args;
+ }
+
+ private List<String> buildSparkSubmitCommand(Map<String, String> env) throws IOException {
+ // Load the properties file and check whether spark-submit will be running the app's driver
+ // or just launching a cluster app. When running the driver, the JVM's argument will be
+ // modified to cover the driver's configuration.
+ Properties props = loadPropertiesFile();
+ boolean isClientMode = isClientMode(props);
+ String extraClassPath = isClientMode ?
+ firstNonEmptyValue(SparkLauncher.DRIVER_EXTRA_CLASSPATH, conf, props) : null;
+
+ List<String> cmd = buildJavaCommand(extraClassPath);
+ addOptionString(cmd, System.getenv("SPARK_SUBMIT_OPTS"));
+ addOptionString(cmd, System.getenv("SPARK_JAVA_OPTS"));
+
+ if (isClientMode) {
+ // Figuring out where the memory value come from is a little tricky due to precedence.
+ // Precedence is observed in the following order:
+ // - explicit configuration (setConf()), which also covers --driver-memory cli argument.
+ // - properties file.
+ // - SPARK_DRIVER_MEMORY env variable
+ // - SPARK_MEM env variable
+ // - default value (512m)
+ String memory = firstNonEmpty(firstNonEmptyValue(SparkLauncher.DRIVER_MEMORY, conf, props),
+ System.getenv("SPARK_DRIVER_MEMORY"), System.getenv("SPARK_MEM"), DEFAULT_MEM);
+ cmd.add("-Xms" + memory);
+ cmd.add("-Xmx" + memory);
+ addOptionString(cmd, firstNonEmptyValue(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS, conf, props));
+ mergeEnvPathList(env, getLibPathEnvName(),
+ firstNonEmptyValue(SparkLauncher.DRIVER_EXTRA_LIBRARY_PATH, conf, props));
+ }
+
+ addPermGenSizeOpt(cmd);
+ cmd.add("org.apache.spark.deploy.SparkSubmit");
+ cmd.addAll(buildSparkSubmitArgs());
+ return cmd;
+ }
+
+ private List<String> buildPySparkShellCommand(Map<String, String> env) throws IOException {
+ // For backwards compatibility, if a script is specified in
+ // the pyspark command line, then run it using spark-submit.
+ if (!appArgs.isEmpty() && appArgs.get(0).endsWith(".py")) {
+ System.err.println(
+ "WARNING: Running python applications through 'pyspark' is deprecated as of Spark 1.0.\n" +
+ "Use ./bin/spark-submit <python file>");
+ appResource = appArgs.get(0);
+ appArgs.remove(0);
+ return buildCommand(env);
+ }
+
+ // When launching the pyspark shell, the spark-submit arguments should be stored in the
+ // PYSPARK_SUBMIT_ARGS env variable. The executable is the PYSPARK_DRIVER_PYTHON env variable
+ // set by the pyspark script, followed by PYSPARK_DRIVER_PYTHON_OPTS.
+ checkArgument(appArgs.isEmpty(), "pyspark does not support any application options.");
+
+ Properties props = loadPropertiesFile();
+ mergeEnvPathList(env, getLibPathEnvName(),
+ firstNonEmptyValue(SparkLauncher.DRIVER_EXTRA_LIBRARY_PATH, conf, props));
+
+ // Store spark-submit arguments in an environment variable, since there's no way to pass
+ // them to shell.py on the comand line.
+ StringBuilder submitArgs = new StringBuilder();
+ for (String arg : buildSparkSubmitArgs()) {
+ if (submitArgs.length() > 0) {
+ submitArgs.append(" ");
+ }
+ submitArgs.append(quoteForPython(arg));
+ }
+ env.put("PYSPARK_SUBMIT_ARGS", submitArgs.toString());
+
+ List<String> pyargs = new ArrayList<String>();
+ pyargs.add(firstNonEmpty(System.getenv("PYSPARK_DRIVER_PYTHON"), "python"));
+ String pyOpts = System.getenv("PYSPARK_DRIVER_PYTHON_OPTS");
+ if (!isEmpty(pyOpts)) {
+ pyargs.addAll(parseOptionString(pyOpts));
+ }
+
+ return pyargs;
+ }
+
+ private boolean isClientMode(Properties userProps) {
+ String userMaster = firstNonEmpty(master, (String) userProps.get(SparkLauncher.SPARK_MASTER));
+ // Default master is "local[*]", so assume client mode in that case.
+ return userMaster == null ||
+ "client".equals(deployMode) ||
+ (!userMaster.equals("yarn-cluster") && deployMode == null);
+ }
+
+ private class OptionParser extends SparkSubmitOptionParser {
+
+ private final List<String> driverJvmKeys = Arrays.asList(
+ SparkLauncher.DRIVER_EXTRA_CLASSPATH,
+ SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS,
+ SparkLauncher.DRIVER_EXTRA_LIBRARY_PATH,
+ SparkLauncher.DRIVER_MEMORY);
+
+ @Override
+ protected boolean handle(String opt, String value) {
+ if (opt.equals(MASTER)) {
+ master = value;
+ } else if (opt.equals(DEPLOY_MODE)) {
+ deployMode = value;
+ } else if (opt.equals(PROPERTIES_FILE)) {
+ propertiesFile = value;
+ } else if (opt.equals(DRIVER_MEMORY)) {
+ conf.put(SparkLauncher.DRIVER_MEMORY, value);
+ } else if (opt.equals(DRIVER_JAVA_OPTIONS)) {
+ conf.put(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS, value);
+ } else if (opt.equals(DRIVER_LIBRARY_PATH)) {
+ conf.put(SparkLauncher.DRIVER_EXTRA_LIBRARY_PATH, value);
+ } else if (opt.equals(DRIVER_CLASS_PATH)) {
+ conf.put(SparkLauncher.DRIVER_EXTRA_CLASSPATH, value);
+ } else if (opt.equals(CONF)) {
+ String[] setConf = value.split("=", 2);
+ checkArgument(setConf.length == 2, "Invalid argument to %s: %s", CONF, value);
+ if (driverJvmKeys.contains(setConf[0])) {
+ conf.put(setConf[0], setConf[1]);
+ }
+ } else if (opt.equals(CLASS)) {
+ // The special classes require some special command line handling, since they allow
+ // mixing spark-submit arguments with arguments that should be propagated to the shell
+ // itself. Note that for this to work, the "--class" argument must come before any
+ // non-spark-submit arguments.
+ mainClass = value;
+ if (specialClasses.containsKey(value)) {
+ allowsMixedArguments = true;
+ appResource = specialClasses.get(value);
+ }
+ } else {
+ sparkArgs.add(opt);
+ if (value != null) {
+ sparkArgs.add(value);
+ }
+ }
+ return true;
+ }
+
+ @Override
+ protected boolean handleUnknown(String opt) {
+ // When mixing arguments, add unrecognized parameters directly to the user arguments list. In
+ // normal mode, any unrecognized parameter triggers the end of command line parsing, and the
+ // parameter itself will be interpreted by SparkSubmit as the application resource. The
+ // remaining params will be appended to the list of SparkSubmit arguments.
+ if (allowsMixedArguments) {
+ appArgs.add(opt);
+ return true;
+ } else {
+ sparkArgs.add(opt);
+ return false;
+ }
+ }
+
+ @Override
+ protected void handleExtraArgs(List<String> extra) {
+ for (String arg : extra) {
+ sparkArgs.add(arg);
+ }
+ }
+
+ }
+
+}
diff --git a/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitOptionParser.java b/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitOptionParser.java
new file mode 100644
index 0000000000..8526d2e7cf
--- /dev/null
+++ b/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitOptionParser.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.launcher;
+
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * Parser for spark-submit command line options.
+ * <p/>
+ * This class encapsulates the parsing code for spark-submit command line options, so that there
+ * is a single list of options that needs to be maintained (well, sort of, but it makes it harder
+ * to break things).
+ */
+class SparkSubmitOptionParser {
+
+ // The following constants define the "main" name for the available options. They're defined
+ // to avoid copy & paste of the raw strings where they're needed.
+ //
+ // The fields are not static so that they're exposed to Scala code that uses this class. See
+ // SparkSubmitArguments.scala. That is also why this class is not abstract - to allow code to
+ // easily use these constants without having to create dummy implementations of this class.
+ protected final String CLASS = "--class";
+ protected final String CONF = "--conf";
+ protected final String DEPLOY_MODE = "--deploy-mode";
+ protected final String DRIVER_CLASS_PATH = "--driver-class-path";
+ protected final String DRIVER_CORES = "--driver-cores";
+ protected final String DRIVER_JAVA_OPTIONS = "--driver-java-options";
+ protected final String DRIVER_LIBRARY_PATH = "--driver-library-path";
+ protected final String DRIVER_MEMORY = "--driver-memory";
+ protected final String EXECUTOR_MEMORY = "--executor-memory";
+ protected final String FILES = "--files";
+ protected final String JARS = "--jars";
+ protected final String KILL_SUBMISSION = "--kill";
+ protected final String MASTER = "--master";
+ protected final String NAME = "--name";
+ protected final String PACKAGES = "--packages";
+ protected final String PROPERTIES_FILE = "--properties-file";
+ protected final String PROXY_USER = "--proxy-user";
+ protected final String PY_FILES = "--py-files";
+ protected final String REPOSITORIES = "--repositories";
+ protected final String STATUS = "--status";
+ protected final String TOTAL_EXECUTOR_CORES = "--total-executor-cores";
+
+ // Options that do not take arguments.
+ protected final String HELP = "--help";
+ protected final String SUPERVISE = "--supervise";
+ protected final String VERBOSE = "--verbose";
+ protected final String VERSION = "--version";
+
+ // Standalone-only options.
+
+ // YARN-only options.
+ protected final String ARCHIVES = "--archives";
+ protected final String EXECUTOR_CORES = "--executor-cores";
+ protected final String QUEUE = "--queue";
+ protected final String NUM_EXECUTORS = "--num-executors";
+
+ /**
+ * This is the canonical list of spark-submit options. Each entry in the array contains the
+ * different aliases for the same option; the first element of each entry is the "official"
+ * name of the option, passed to {@link #handle(String, String)}.
+ * <p/>
+ * Options not listed here nor in the "switch" list below will result in a call to
+ * {@link $#handleUnknown(String)}.
+ * <p/>
+ * These two arrays are visible for tests.
+ */
+ final String[][] opts = {
+ { ARCHIVES },
+ { CLASS },
+ { CONF, "-c" },
+ { DEPLOY_MODE },
+ { DRIVER_CLASS_PATH },
+ { DRIVER_CORES },
+ { DRIVER_JAVA_OPTIONS },
+ { DRIVER_LIBRARY_PATH },
+ { DRIVER_MEMORY },
+ { EXECUTOR_CORES },
+ { EXECUTOR_MEMORY },
+ { FILES },
+ { JARS },
+ { KILL_SUBMISSION },
+ { MASTER },
+ { NAME },
+ { NUM_EXECUTORS },
+ { PACKAGES },
+ { PROPERTIES_FILE },
+ { PROXY_USER },
+ { PY_FILES },
+ { QUEUE },
+ { REPOSITORIES },
+ { STATUS },
+ { TOTAL_EXECUTOR_CORES },
+ };
+
+ /**
+ * List of switches (command line options that do not take parameters) recognized by spark-submit.
+ */
+ final String[][] switches = {
+ { HELP, "-h" },
+ { SUPERVISE },
+ { VERBOSE, "-v" },
+ { VERSION },
+ };
+
+ /**
+ * Parse a list of spark-submit command line options.
+ * <p/>
+ * See SparkSubmitArguments.scala for a more formal description of available options.
+ *
+ * @throws IllegalArgumentException If an error is found during parsing.
+ */
+ protected final void parse(List<String> args) {
+ Pattern eqSeparatedOpt = Pattern.compile("(--[^=]+)=(.+)");
+
+ int idx = 0;
+ for (idx = 0; idx < args.size(); idx++) {
+ String arg = args.get(idx);
+ String value = null;
+
+ Matcher m = eqSeparatedOpt.matcher(arg);
+ if (m.matches()) {
+ arg = m.group(1);
+ value = m.group(2);
+ }
+
+ // Look for options with a value.
+ String name = findCliOption(arg, opts);
+ if (name != null) {
+ if (value == null) {
+ if (idx == args.size() - 1) {
+ throw new IllegalArgumentException(
+ String.format("Missing argument for option '%s'.", arg));
+ }
+ idx++;
+ value = args.get(idx);
+ }
+ if (!handle(name, value)) {
+ break;
+ }
+ continue;
+ }
+
+ // Look for a switch.
+ name = findCliOption(arg, switches);
+ if (name != null) {
+ if (!handle(name, null)) {
+ break;
+ }
+ continue;
+ }
+
+ if (!handleUnknown(arg)) {
+ break;
+ }
+ }
+
+ if (idx < args.size()) {
+ idx++;
+ }
+ handleExtraArgs(args.subList(idx, args.size()));
+ }
+
+ /**
+ * Callback for when an option with an argument is parsed.
+ *
+ * @param opt The long name of the cli option (might differ from actual command line).
+ * @param value The value. This will be <i>null</i> if the option does not take a value.
+ * @return Whether to continue parsing the argument list.
+ */
+ protected boolean handle(String opt, String value) {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * Callback for when an unrecognized option is parsed.
+ *
+ * @param opt Unrecognized option from the command line.
+ * @return Whether to continue parsing the argument list.
+ */
+ protected boolean handleUnknown(String opt) {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * Callback for remaining command line arguments after either {@link #handle(String, String)} or
+ * {@link #handleUnknown(String)} return "false". This will be called at the end of parsing even
+ * when there are no remaining arguments.
+ *
+ * @param extra List of remaining arguments.
+ */
+ protected void handleExtraArgs(List<String> extra) {
+ throw new UnsupportedOperationException();
+ }
+
+ private String findCliOption(String name, String[][] available) {
+ for (String[] candidates : available) {
+ for (String candidate : candidates) {
+ if (candidate.equals(name)) {
+ return candidates[0];
+ }
+ }
+ }
+ return null;
+ }
+
+}
diff --git a/launcher/src/main/java/org/apache/spark/launcher/package-info.java b/launcher/src/main/java/org/apache/spark/launcher/package-info.java
new file mode 100644
index 0000000000..7ed756f4b8
--- /dev/null
+++ b/launcher/src/main/java/org/apache/spark/launcher/package-info.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Library for launching Spark applications.
+ * <p/>
+ * This library allows applications to launch Spark programmatically. There's only one entry
+ * point to the library - the {@link org.apache.spark.launcher.SparkLauncher} class.
+ * <p/>
+ * To launch a Spark application, just instantiate a {@link org.apache.spark.launcher.SparkLauncher}
+ * and configure the application to run. For example:
+ *
+ * <pre>
+ * {@code
+ * import org.apache.spark.launcher.SparkLauncher;
+ *
+ * public class MyLauncher {
+ * public static void main(String[] args) throws Exception {
+ * Process spark = new SparkLauncher()
+ * .setAppResource("/my/app.jar")
+ * .setMainClass("my.spark.app.Main")
+ * .setMaster("local")
+ * .setConf(SparkLauncher.DRIVER_MEMORY, "2g")
+ * .launch();
+ * spark.waitFor();
+ * }
+ * }
+ * }
+ * </pre>
+ */
+package org.apache.spark.launcher;
diff --git a/launcher/src/test/java/org/apache/spark/launcher/CommandBuilderUtilsSuite.java b/launcher/src/test/java/org/apache/spark/launcher/CommandBuilderUtilsSuite.java
new file mode 100644
index 0000000000..dba0203867
--- /dev/null
+++ b/launcher/src/test/java/org/apache/spark/launcher/CommandBuilderUtilsSuite.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.launcher;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+import static org.apache.spark.launcher.CommandBuilderUtils.*;
+
+public class CommandBuilderUtilsSuite {
+
+ @Test
+ public void testValidOptionStrings() {
+ testOpt("a b c d e", Arrays.asList("a", "b", "c", "d", "e"));
+ testOpt("a 'b c' \"d\" e", Arrays.asList("a", "b c", "d", "e"));
+ testOpt("a 'b\\\"c' \"'d'\" e", Arrays.asList("a", "b\\\"c", "'d'", "e"));
+ testOpt("a 'b\"c' \"\\\"d\\\"\" e", Arrays.asList("a", "b\"c", "\"d\"", "e"));
+ testOpt(" a b c \\\\ ", Arrays.asList("a", "b", "c", "\\"));
+
+ // Following tests ported from UtilsSuite.scala.
+ testOpt("", new ArrayList<String>());
+ testOpt("a", Arrays.asList("a"));
+ testOpt("aaa", Arrays.asList("aaa"));
+ testOpt("a b c", Arrays.asList("a", "b", "c"));
+ testOpt(" a b\t c ", Arrays.asList("a", "b", "c"));
+ testOpt("a 'b c'", Arrays.asList("a", "b c"));
+ testOpt("a 'b c' d", Arrays.asList("a", "b c", "d"));
+ testOpt("'b c'", Arrays.asList("b c"));
+ testOpt("a \"b c\"", Arrays.asList("a", "b c"));
+ testOpt("a \"b c\" d", Arrays.asList("a", "b c", "d"));
+ testOpt("\"b c\"", Arrays.asList("b c"));
+ testOpt("a 'b\" c' \"d' e\"", Arrays.asList("a", "b\" c", "d' e"));
+ testOpt("a\t'b\nc'\nd", Arrays.asList("a", "b\nc", "d"));
+ testOpt("a \"b\\\\c\"", Arrays.asList("a", "b\\c"));
+ testOpt("a \"b\\\"c\"", Arrays.asList("a", "b\"c"));
+ testOpt("a 'b\\\"c'", Arrays.asList("a", "b\\\"c"));
+ testOpt("'a'b", Arrays.asList("ab"));
+ testOpt("'a''b'", Arrays.asList("ab"));
+ testOpt("\"a\"b", Arrays.asList("ab"));
+ testOpt("\"a\"\"b\"", Arrays.asList("ab"));
+ testOpt("''", Arrays.asList(""));
+ testOpt("\"\"", Arrays.asList(""));
+ }
+
+ @Test
+ public void testInvalidOptionStrings() {
+ testInvalidOpt("\\");
+ testInvalidOpt("\"abcde");
+ testInvalidOpt("'abcde");
+ }
+
+ @Test
+ public void testWindowsBatchQuoting() {
+ assertEquals("abc", quoteForBatchScript("abc"));
+ assertEquals("\"a b c\"", quoteForBatchScript("a b c"));
+ assertEquals("\"a \"\"b\"\" c\"", quoteForBatchScript("a \"b\" c"));
+ assertEquals("\"a\"\"b\"\"c\"", quoteForBatchScript("a\"b\"c"));
+ assertEquals("\"ab^=\"\"cd\"\"\"", quoteForBatchScript("ab=\"cd\""));
+ }
+
+ @Test
+ public void testPythonArgQuoting() {
+ assertEquals("\"abc\"", quoteForPython("abc"));
+ assertEquals("\"a b c\"", quoteForPython("a b c"));
+ assertEquals("\"a \\\"b\\\" c\"", quoteForPython("a \"b\" c"));
+ }
+
+ private void testOpt(String opts, List<String> expected) {
+ assertEquals(String.format("test string failed to parse: [[ %s ]]", opts),
+ expected, parseOptionString(opts));
+ }
+
+ private void testInvalidOpt(String opts) {
+ try {
+ parseOptionString(opts);
+ fail("Expected exception for invalid option string.");
+ } catch (IllegalArgumentException e) {
+ // pass.
+ }
+ }
+
+}
diff --git a/launcher/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java b/launcher/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java
new file mode 100644
index 0000000000..252d5abae1
--- /dev/null
+++ b/launcher/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.launcher;
+
+import java.io.BufferedReader;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import static org.junit.Assert.*;
+
+/**
+ * These tests require the Spark assembly to be built before they can be run.
+ */
+public class SparkLauncherSuite {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SparkLauncherSuite.class);
+
+ @Test
+ public void testChildProcLauncher() throws Exception {
+ Map<String, String> env = new HashMap<String, String>();
+ env.put("SPARK_PRINT_LAUNCH_COMMAND", "1");
+
+ SparkLauncher launcher = new SparkLauncher(env)
+ .setSparkHome(System.getProperty("spark.test.home"))
+ .setMaster("local")
+ .setAppResource("spark-internal")
+ .setConf(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS,
+ "-Dfoo=bar -Dtest.name=-testChildProcLauncher")
+ .setConf(SparkLauncher.DRIVER_EXTRA_CLASSPATH, System.getProperty("java.class.path"))
+ .setMainClass(SparkLauncherTestApp.class.getName())
+ .addAppArgs("proc");
+ final Process app = launcher.launch();
+ new Redirector("stdout", app.getInputStream()).start();
+ new Redirector("stderr", app.getErrorStream()).start();
+ assertEquals(0, app.waitFor());
+ }
+
+ public static class SparkLauncherTestApp {
+
+ public static void main(String[] args) throws Exception {
+ assertEquals(1, args.length);
+ assertEquals("proc", args[0]);
+ assertEquals("bar", System.getProperty("foo"));
+ assertEquals("local", System.getProperty(SparkLauncher.SPARK_MASTER));
+ }
+
+ }
+
+ private static class Redirector extends Thread {
+
+ private final InputStream in;
+
+ Redirector(String name, InputStream in) {
+ this.in = in;
+ setName(name);
+ setDaemon(true);
+ }
+
+ @Override
+ public void run() {
+ try {
+ BufferedReader reader = new BufferedReader(new InputStreamReader(in, "UTF-8"));
+ String line;
+ while ((line = reader.readLine()) != null) {
+ LOG.warn(line);
+ }
+ } catch (Exception e) {
+ LOG.error("Error reading process output.", e);
+ }
+ }
+
+ }
+
+}
diff --git a/launcher/src/test/java/org/apache/spark/launcher/SparkSubmitCommandBuilderSuite.java b/launcher/src/test/java/org/apache/spark/launcher/SparkSubmitCommandBuilderSuite.java
new file mode 100644
index 0000000000..815edc4e49
--- /dev/null
+++ b/launcher/src/test/java/org/apache/spark/launcher/SparkSubmitCommandBuilderSuite.java
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.launcher;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Pattern;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+public class SparkSubmitCommandBuilderSuite {
+
+ private static File dummyPropsFile;
+ private static SparkSubmitOptionParser parser;
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ dummyPropsFile = File.createTempFile("spark", "properties");
+ parser = new SparkSubmitOptionParser();
+ }
+
+ @AfterClass
+ public static void cleanUp() throws Exception {
+ dummyPropsFile.delete();
+ }
+
+ @Test
+ public void testDriverCmdBuilder() throws Exception {
+ testCmdBuilder(true);
+ }
+
+ @Test
+ public void testClusterCmdBuilder() throws Exception {
+ testCmdBuilder(false);
+ }
+
+ @Test
+ public void testCliParser() throws Exception {
+ List<String> sparkSubmitArgs = Arrays.asList(
+ parser.MASTER,
+ "local",
+ parser.DRIVER_MEMORY,
+ "42g",
+ parser.DRIVER_CLASS_PATH,
+ "/driverCp",
+ parser.DRIVER_JAVA_OPTIONS,
+ "extraJavaOpt",
+ parser.CONF,
+ SparkLauncher.DRIVER_EXTRA_LIBRARY_PATH + "=/driverLibPath");
+ Map<String, String> env = new HashMap<String, String>();
+ List<String> cmd = buildCommand(sparkSubmitArgs, env);
+
+ assertTrue(findInStringList(env.get(CommandBuilderUtils.getLibPathEnvName()),
+ File.pathSeparator, "/driverLibPath"));
+ assertTrue(findInStringList(findArgValue(cmd, "-cp"), File.pathSeparator, "/driverCp"));
+ assertTrue("Driver -Xms should be configured.", cmd.contains("-Xms42g"));
+ assertTrue("Driver -Xmx should be configured.", cmd.contains("-Xmx42g"));
+ }
+
+ @Test
+ public void testShellCliParser() throws Exception {
+ List<String> sparkSubmitArgs = Arrays.asList(
+ parser.CLASS,
+ "org.apache.spark.repl.Main",
+ parser.MASTER,
+ "foo",
+ "--app-arg",
+ "bar",
+ "--app-switch",
+ parser.FILES,
+ "baz",
+ parser.NAME,
+ "appName");
+
+ List<String> args = new SparkSubmitCommandBuilder(sparkSubmitArgs).buildSparkSubmitArgs();
+ List<String> expected = Arrays.asList("spark-shell", "--app-arg", "bar", "--app-switch");
+ assertEquals(expected, args.subList(args.size() - expected.size(), args.size()));
+ }
+
+ @Test
+ public void testAlternateSyntaxParsing() throws Exception {
+ List<String> sparkSubmitArgs = Arrays.asList(
+ parser.CLASS + "=org.my.Class",
+ parser.MASTER + "=foo",
+ parser.DEPLOY_MODE + "=bar");
+
+ List<String> cmd = new SparkSubmitCommandBuilder(sparkSubmitArgs).buildSparkSubmitArgs();
+ assertEquals("org.my.Class", findArgValue(cmd, parser.CLASS));
+ assertEquals("foo", findArgValue(cmd, parser.MASTER));
+ assertEquals("bar", findArgValue(cmd, parser.DEPLOY_MODE));
+ }
+
+ @Test
+ public void testPySparkLauncher() throws Exception {
+ List<String> sparkSubmitArgs = Arrays.asList(
+ SparkSubmitCommandBuilder.PYSPARK_SHELL,
+ "--master=foo",
+ "--deploy-mode=bar");
+
+ Map<String, String> env = new HashMap<String, String>();
+ List<String> cmd = buildCommand(sparkSubmitArgs, env);
+ assertEquals("python", cmd.get(cmd.size() - 1));
+ assertEquals(
+ String.format("\"%s\" \"foo\" \"%s\" \"bar\" \"%s\"",
+ parser.MASTER, parser.DEPLOY_MODE, SparkSubmitCommandBuilder.PYSPARK_SHELL_RESOURCE),
+ env.get("PYSPARK_SUBMIT_ARGS"));
+ }
+
+ @Test
+ public void testPySparkFallback() throws Exception {
+ List<String> sparkSubmitArgs = Arrays.asList(
+ "--master=foo",
+ "--deploy-mode=bar",
+ "script.py",
+ "arg1");
+
+ Map<String, String> env = new HashMap<String, String>();
+ List<String> cmd = buildCommand(sparkSubmitArgs, env);
+
+ assertEquals("foo", findArgValue(cmd, "--master"));
+ assertEquals("bar", findArgValue(cmd, "--deploy-mode"));
+ assertEquals("script.py", cmd.get(cmd.size() - 2));
+ assertEquals("arg1", cmd.get(cmd.size() - 1));
+ }
+
+ private void testCmdBuilder(boolean isDriver) throws Exception {
+ String deployMode = isDriver ? "client" : "cluster";
+
+ SparkSubmitCommandBuilder launcher =
+ new SparkSubmitCommandBuilder(Collections.<String>emptyList());
+ launcher.childEnv.put(CommandBuilderUtils.ENV_SPARK_HOME,
+ System.getProperty("spark.test.home"));
+ launcher.master = "yarn";
+ launcher.deployMode = deployMode;
+ launcher.appResource = "/foo";
+ launcher.appName = "MyApp";
+ launcher.mainClass = "my.Class";
+ launcher.propertiesFile = dummyPropsFile.getAbsolutePath();
+ launcher.appArgs.add("foo");
+ launcher.appArgs.add("bar");
+ launcher.conf.put(SparkLauncher.DRIVER_MEMORY, "1g");
+ launcher.conf.put(SparkLauncher.DRIVER_EXTRA_CLASSPATH, "/driver");
+ launcher.conf.put(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS, "-Ddriver -XX:MaxPermSize=256m");
+ launcher.conf.put(SparkLauncher.DRIVER_EXTRA_LIBRARY_PATH, "/native");
+ launcher.conf.put("spark.foo", "foo");
+
+ Map<String, String> env = new HashMap<String, String>();
+ List<String> cmd = launcher.buildCommand(env);
+
+ // Checks below are different for driver and non-driver mode.
+
+ if (isDriver) {
+ assertTrue("Driver -Xms should be configured.", cmd.contains("-Xms1g"));
+ assertTrue("Driver -Xmx should be configured.", cmd.contains("-Xmx1g"));
+ } else {
+ boolean found = false;
+ for (String arg : cmd) {
+ if (arg.startsWith("-Xms") || arg.startsWith("-Xmx")) {
+ found = true;
+ break;
+ }
+ }
+ assertFalse("Memory arguments should not be set.", found);
+ }
+
+ for (String arg : cmd) {
+ if (arg.startsWith("-XX:MaxPermSize=")) {
+ if (isDriver) {
+ assertEquals("-XX:MaxPermSize=256m", arg);
+ } else {
+ assertEquals("-XX:MaxPermSize=128m", arg);
+ }
+ }
+ }
+
+ String[] cp = findArgValue(cmd, "-cp").split(Pattern.quote(File.pathSeparator));
+ if (isDriver) {
+ assertTrue("Driver classpath should contain provided entry.", contains("/driver", cp));
+ } else {
+ assertFalse("Driver classpath should not be in command.", contains("/driver", cp));
+ }
+
+ String libPath = env.get(CommandBuilderUtils.getLibPathEnvName());
+ if (isDriver) {
+ assertNotNull("Native library path should be set.", libPath);
+ assertTrue("Native library path should contain provided entry.",
+ contains("/native", libPath.split(Pattern.quote(File.pathSeparator))));
+ } else {
+ assertNull("Native library should not be set.", libPath);
+ }
+
+ // Checks below are the same for both driver and non-driver mode.
+ assertEquals(dummyPropsFile.getAbsolutePath(), findArgValue(cmd, parser.PROPERTIES_FILE));
+ assertEquals("yarn", findArgValue(cmd, parser.MASTER));
+ assertEquals(deployMode, findArgValue(cmd, parser.DEPLOY_MODE));
+ assertEquals("my.Class", findArgValue(cmd, parser.CLASS));
+ assertEquals("MyApp", findArgValue(cmd, parser.NAME));
+
+ boolean appArgsOk = false;
+ for (int i = 0; i < cmd.size(); i++) {
+ if (cmd.get(i).equals("/foo")) {
+ assertEquals("foo", cmd.get(i + 1));
+ assertEquals("bar", cmd.get(i + 2));
+ assertEquals(cmd.size(), i + 3);
+ appArgsOk = true;
+ break;
+ }
+ }
+ assertTrue("App resource and args should be added to command.", appArgsOk);
+
+ Map<String, String> conf = parseConf(cmd, parser);
+ assertEquals("foo", conf.get("spark.foo"));
+ }
+
+ private boolean contains(String needle, String[] haystack) {
+ for (String entry : haystack) {
+ if (entry.equals(needle)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private Map<String, String> parseConf(List<String> cmd, SparkSubmitOptionParser parser) {
+ Map<String, String> conf = new HashMap<String, String>();
+ for (int i = 0; i < cmd.size(); i++) {
+ if (cmd.get(i).equals(parser.CONF)) {
+ String[] val = cmd.get(i + 1).split("=", 2);
+ conf.put(val[0], val[1]);
+ i += 1;
+ }
+ }
+ return conf;
+ }
+
+ private String findArgValue(List<String> cmd, String name) {
+ for (int i = 0; i < cmd.size(); i++) {
+ if (cmd.get(i).equals(name)) {
+ return cmd.get(i + 1);
+ }
+ }
+ fail(String.format("arg '%s' not found", name));
+ return null;
+ }
+
+ private boolean findInStringList(String list, String sep, String needle) {
+ return contains(needle, list.split(sep));
+ }
+
+ private List<String> buildCommand(List<String> args, Map<String, String> env) throws Exception {
+ SparkSubmitCommandBuilder builder = new SparkSubmitCommandBuilder(args);
+ builder.childEnv.put(CommandBuilderUtils.ENV_SPARK_HOME, System.getProperty("spark.test.home"));
+ return builder.buildCommand(env);
+ }
+
+}
diff --git a/launcher/src/test/java/org/apache/spark/launcher/SparkSubmitOptionParserSuite.java b/launcher/src/test/java/org/apache/spark/launcher/SparkSubmitOptionParserSuite.java
new file mode 100644
index 0000000000..f3d2109917
--- /dev/null
+++ b/launcher/src/test/java/org/apache/spark/launcher/SparkSubmitOptionParserSuite.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.launcher;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import org.junit.Before;
+import org.junit.Test;
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+import static org.apache.spark.launcher.SparkSubmitOptionParser.*;
+
+public class SparkSubmitOptionParserSuite {
+
+ private SparkSubmitOptionParser parser;
+
+ @Before
+ public void setUp() {
+ parser = spy(new DummyParser());
+ }
+
+ @Test
+ public void testAllOptions() {
+ int count = 0;
+ for (String[] optNames : parser.opts) {
+ for (String optName : optNames) {
+ String value = optName + "-value";
+ parser.parse(Arrays.asList(optName, value));
+ count++;
+ verify(parser).handle(eq(optNames[0]), eq(value));
+ verify(parser, times(count)).handle(anyString(), anyString());
+ verify(parser, times(count)).handleExtraArgs(eq(Collections.<String>emptyList()));
+ }
+ }
+
+ for (String[] switchNames : parser.switches) {
+ int switchCount = 0;
+ for (String name : switchNames) {
+ parser.parse(Arrays.asList(name));
+ count++;
+ switchCount++;
+ verify(parser, times(switchCount)).handle(eq(switchNames[0]), same((String) null));
+ verify(parser, times(count)).handle(anyString(), any(String.class));
+ verify(parser, times(count)).handleExtraArgs(eq(Collections.<String>emptyList()));
+ }
+ }
+ }
+
+ @Test
+ public void testExtraOptions() {
+ List<String> args = Arrays.asList(parser.MASTER, parser.MASTER, "foo", "bar");
+ parser.parse(args);
+ verify(parser).handle(eq(parser.MASTER), eq(parser.MASTER));
+ verify(parser).handleUnknown(eq("foo"));
+ verify(parser).handleExtraArgs(eq(Arrays.asList("bar")));
+ }
+
+ @Test(expected=IllegalArgumentException.class)
+ public void testMissingArg() {
+ parser.parse(Arrays.asList(parser.MASTER));
+ }
+
+ @Test
+ public void testEqualSeparatedOption() {
+ List<String> args = Arrays.asList(parser.MASTER + "=" + parser.MASTER);
+ parser.parse(args);
+ verify(parser).handle(eq(parser.MASTER), eq(parser.MASTER));
+ verify(parser).handleExtraArgs(eq(Collections.<String>emptyList()));
+ }
+
+ private static class DummyParser extends SparkSubmitOptionParser {
+
+ @Override
+ protected boolean handle(String opt, String value) {
+ return true;
+ }
+
+ @Override
+ protected boolean handleUnknown(String opt) {
+ return false;
+ }
+
+ @Override
+ protected void handleExtraArgs(List<String> extra) {
+
+ }
+
+ }
+
+}
diff --git a/launcher/src/test/resources/log4j.properties b/launcher/src/test/resources/log4j.properties
new file mode 100644
index 0000000000..00c20ad69c
--- /dev/null
+++ b/launcher/src/test/resources/log4j.properties
@@ -0,0 +1,31 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Set everything to be logged to the file core/target/unit-tests.log
+log4j.rootCategory=INFO, file
+log4j.appender.file=org.apache.log4j.FileAppender
+log4j.appender.file.append=false
+
+# Some tests will set "test.name" to avoid overwriting the main log file.
+log4j.appender.file.file=target/unit-tests${test.name}.log
+
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n
+
+# Ignore messages below warning level from Jetty, because it's a bit verbose
+log4j.logger.org.eclipse.jetty=WARN
+org.eclipse.jetty.LEVEL=WARN
diff --git a/make-distribution.sh b/make-distribution.sh
index dd990d4b96..82d33408cd 100755
--- a/make-distribution.sh
+++ b/make-distribution.sh
@@ -127,6 +127,7 @@ if [ ! $(command -v "$MVN") ] ; then
fi
VERSION=$("$MVN" help:evaluate -Dexpression=project.version 2>/dev/null | grep -v "INFO" | tail -n 1)
+SCALA_VERSION=$("$MVN" help:evaluate -Dexpression=scala.binary.version 2>/dev/null | grep -v "INFO" | tail -n 1)
SPARK_HADOOP_VERSION=$("$MVN" help:evaluate -Dexpression=hadoop.version $@ 2>/dev/null\
| grep -v "INFO"\
| tail -n 1)
@@ -196,6 +197,7 @@ echo "Build flags: $@" >> "$DISTDIR/RELEASE"
# Copy jars
cp "$SPARK_HOME"/assembly/target/scala*/*assembly*hadoop*.jar "$DISTDIR/lib/"
cp "$SPARK_HOME"/examples/target/scala*/spark-examples*.jar "$DISTDIR/lib/"
+cp "$SPARK_HOME"/launcher/target/spark-launcher_$SCALA_VERSION-$VERSION.jar "$DISTDIR/lib/"
# This will fail if the -Pyarn profile is not provided
# In this case, silence the error and ignore the return code of this command
cp "$SPARK_HOME"/network/yarn/target/scala*/spark-*-yarn-shuffle.jar "$DISTDIR/lib/" &> /dev/null || :
diff --git a/pom.xml b/pom.xml
index 51bef30f9c..a19da73cf4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -105,6 +105,7 @@
<module>external/zeromq</module>
<module>examples</module>
<module>repl</module>
+ <module>launcher</module>
</modules>
<properties>
@@ -1195,7 +1196,7 @@
</environmentVariables>
<systemProperties>
<java.awt.headless>true</java.awt.headless>
- <spark.test.home>${session.executionRootDirectory}</spark.test.home>
+ <spark.test.home>${spark.test.home}</spark.test.home>
<spark.testing>1</spark.testing>
<spark.ui.enabled>false</spark.ui.enabled>
<spark.ui.showConsoleProgress>false</spark.ui.showConsoleProgress>
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index 4f17df59f4..35e748f26b 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -34,11 +34,11 @@ object BuildCommons {
val allProjects@Seq(bagel, catalyst, core, graphx, hive, hiveThriftServer, mllib, repl,
sql, networkCommon, networkShuffle, streaming, streamingFlumeSink, streamingFlume, streamingKafka,
- streamingMqtt, streamingTwitter, streamingZeromq) =
+ streamingMqtt, streamingTwitter, streamingZeromq, launcher) =
Seq("bagel", "catalyst", "core", "graphx", "hive", "hive-thriftserver", "mllib", "repl",
"sql", "network-common", "network-shuffle", "streaming", "streaming-flume-sink",
"streaming-flume", "streaming-kafka", "streaming-mqtt", "streaming-twitter",
- "streaming-zeromq").map(ProjectRef(buildLocation, _))
+ "streaming-zeromq", "launcher").map(ProjectRef(buildLocation, _))
val optionallyEnabledProjects@Seq(yarn, yarnStable, java8Tests, sparkGangliaLgpl,
sparkKinesisAsl) = Seq("yarn", "yarn-stable", "java8-tests", "ganglia-lgpl",
@@ -155,8 +155,9 @@ object SparkBuild extends PomBuild {
(allProjects ++ optionallyEnabledProjects).foreach(enable(TestSettings.settings))
// TODO: Add Sql to mima checks
+ // TODO: remove launcher from this list after 1.3.
allProjects.filterNot(x => Seq(spark, sql, hive, hiveThriftServer, catalyst, repl,
- networkCommon, networkShuffle, networkYarn).contains(x)).foreach {
+ networkCommon, networkShuffle, networkYarn, launcher).contains(x)).foreach {
x => enable(MimaBuild.mimaSettings(sparkHome, x))(x)
}
diff --git a/python/pyspark/java_gateway.py b/python/pyspark/java_gateway.py
index 936857e75c..43d2cf5171 100644
--- a/python/pyspark/java_gateway.py
+++ b/python/pyspark/java_gateway.py
@@ -41,7 +41,7 @@ def launch_gateway():
submit_args = os.environ.get("PYSPARK_SUBMIT_ARGS")
submit_args = submit_args if submit_args is not None else ""
submit_args = shlex.split(submit_args)
- command = [os.path.join(SPARK_HOME, script)] + submit_args + ["pyspark-shell"]
+ command = [os.path.join(SPARK_HOME, script)] + submit_args
# Start a socket that will be used by PythonGatewayServer to communicate its port to us
callback_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
@@ -58,7 +58,6 @@ def launch_gateway():
# Don't send ctrl-c / SIGINT to the Java gateway:
def preexec_func():
signal.signal(signal.SIGINT, signal.SIG_IGN)
- env["IS_SUBPROCESS"] = "1" # tell JVM to exit after python exits
proc = Popen(command, stdin=PIPE, preexec_fn=preexec_func, env=env)
else:
# preexec_fn not supported on Windows
diff --git a/sbin/spark-daemon.sh b/sbin/spark-daemon.sh
index 5e812a1d91..92e76a3fe6 100755
--- a/sbin/spark-daemon.sh
+++ b/sbin/spark-daemon.sh
@@ -121,45 +121,63 @@ if [ "$SPARK_NICENESS" = "" ]; then
export SPARK_NICENESS=0
fi
+run_command() {
+ mode="$1"
+ shift
-case $option in
+ mkdir -p "$SPARK_PID_DIR"
- (start|spark-submit)
+ if [ -f "$pid" ]; then
+ TARGET_ID="$(cat "$pid")"
+ if [[ $(ps -p "$TARGET_ID" -o args=) =~ $command ]]; then
+ echo "$command running as process $TARGET_ID. Stop it first."
+ exit 1
+ fi
+ fi
- mkdir -p "$SPARK_PID_DIR"
+ if [ "$SPARK_MASTER" != "" ]; then
+ echo rsync from "$SPARK_MASTER"
+ rsync -a -e ssh --delete --exclude=.svn --exclude='logs/*' --exclude='contrib/hod/logs/*' "$SPARK_MASTER/" "$SPARK_HOME"
+ fi
- if [ -f $pid ]; then
- TARGET_ID="$(cat "$pid")"
- if [[ $(ps -p "$TARGET_ID" -o args=) =~ $command ]]; then
- echo "$command running as process $TARGET_ID. Stop it first."
- exit 1
- fi
- fi
+ spark_rotate_log "$log"
+ echo "starting $command, logging to $log"
+
+ case "$mode" in
+ (class)
+ nohup nice -n "$SPARK_NICENESS" "$SPARK_PREFIX"/bin/spark-class $command "$@" >> "$log" 2>&1 < /dev/null &
+ newpid="$!"
+ ;;
+
+ (submit)
+ nohup nice -n "$SPARK_NICENESS" "$SPARK_PREFIX"/bin/spark-submit --class $command "$@" >> "$log" 2>&1 < /dev/null &
+ newpid="$!"
+ ;;
+
+ (*)
+ echo "unknown mode: $mode"
+ exit 1
+ ;;
+ esac
+
+ echo "$newpid" > "$pid"
+ sleep 2
+ # Check if the process has died; in that case we'll tail the log so the user can see
+ if [[ ! $(ps -p "$newpid" -o args=) =~ $command ]]; then
+ echo "failed to launch $command:"
+ tail -2 "$log" | sed 's/^/ /'
+ echo "full log in $log"
+ fi
+}
- if [ "$SPARK_MASTER" != "" ]; then
- echo rsync from "$SPARK_MASTER"
- rsync -a -e ssh --delete --exclude=.svn --exclude='logs/*' --exclude='contrib/hod/logs/*' $SPARK_MASTER/ "$SPARK_HOME"
- fi
+case $option in
- spark_rotate_log "$log"
- echo "starting $command, logging to $log"
- if [ $option == spark-submit ]; then
- source "$SPARK_HOME"/bin/utils.sh
- gatherSparkSubmitOpts "$@"
- nohup nice -n $SPARK_NICENESS "$SPARK_PREFIX"/bin/spark-submit --class $command \
- "${SUBMISSION_OPTS[@]}" spark-internal "${APPLICATION_OPTS[@]}" >> "$log" 2>&1 < /dev/null &
- else
- nohup nice -n $SPARK_NICENESS "$SPARK_PREFIX"/bin/spark-class $command "$@" >> "$log" 2>&1 < /dev/null &
- fi
- newpid=$!
- echo $newpid > $pid
- sleep 2
- # Check if the process has died; in that case we'll tail the log so the user can see
- if [[ ! $(ps -p "$newpid" -o args=) =~ $command ]]; then
- echo "failed to launch $command:"
- tail -2 "$log" | sed 's/^/ /'
- echo "full log in $log"
- fi
+ (submit)
+ run_command submit "$@"
+ ;;
+
+ (start)
+ run_command class "$@"
;;
(stop)
diff --git a/sbin/start-thriftserver.sh b/sbin/start-thriftserver.sh
index 070cc7a87e..5b0aeb177f 100755
--- a/sbin/start-thriftserver.sh
+++ b/sbin/start-thriftserver.sh
@@ -52,4 +52,4 @@ fi
export SUBMIT_USAGE_FUNCTION=usage
-exec "$FWDIR"/sbin/spark-daemon.sh spark-submit $CLASS 1 "$@"
+exec "$FWDIR"/sbin/spark-daemon.sh submit $CLASS 1 "$@"