aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rwxr-xr-xbin/spark-class49
-rwxr-xr-xbin/spark-submit28
-rwxr-xr-x[-rw-r--r--]bin/utils.sh0
-rw-r--r--conf/spark-defaults.conf.template10
-rw-r--r--core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala25
-rw-r--r--core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala17
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala149
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala21
10 files changed, 250 insertions, 56 deletions
diff --git a/bin/spark-class b/bin/spark-class
index 3f6beca5be..22acf92288 100755
--- a/bin/spark-class
+++ b/bin/spark-class
@@ -17,6 +17,8 @@
# limitations under the License.
#
+# NOTE: Any changes to this file must be reflected in SparkSubmitDriverBootstrapper.scala!
+
cygwin=false
case "`uname`" in
CYGWIN*) cygwin=true;;
@@ -39,7 +41,7 @@ fi
if [ -n "$SPARK_MEM" ]; then
echo -e "Warning: SPARK_MEM is deprecated, please use a more specific config option" 1>&2
- echo -e "(e.g., spark.executor.memory or SPARK_DRIVER_MEMORY)." 1>&2
+ echo -e "(e.g., spark.executor.memory or spark.driver.memory)." 1>&2
fi
# Use SPARK_MEM or 512m as the default memory, to be overridden by specific options
@@ -73,11 +75,17 @@ case "$1" in
OUR_JAVA_MEM=${SPARK_EXECUTOR_MEMORY:-$DEFAULT_MEM}
;;
- # Spark submit uses SPARK_SUBMIT_OPTS and SPARK_JAVA_OPTS
- 'org.apache.spark.deploy.SparkSubmit')
- OUR_JAVA_OPTS="$SPARK_JAVA_OPTS $SPARK_SUBMIT_OPTS \
- -Djava.library.path=$SPARK_SUBMIT_LIBRARY_PATH"
+ # Spark submit uses SPARK_JAVA_OPTS + SPARK_SUBMIT_OPTS +
+ # SPARK_DRIVER_MEMORY + SPARK_SUBMIT_DRIVER_MEMORY.
+ 'org.apache.spark.deploy.SparkSubmit')
+ OUR_JAVA_OPTS="$SPARK_JAVA_OPTS $SPARK_SUBMIT_OPTS"
OUR_JAVA_MEM=${SPARK_DRIVER_MEMORY:-$DEFAULT_MEM}
+ if [ -n "$SPARK_SUBMIT_LIBRARY_PATH" ]; then
+ OUR_JAVA_OPTS="$OUR_JAVA_OPTS -Djava.library.path=$SPARK_SUBMIT_LIBRARY_PATH"
+ fi
+ if [ -n "$SPARK_SUBMIT_DRIVER_MEMORY" ]; then
+ OUR_JAVA_MEM="$SPARK_SUBMIT_DRIVER_MEMORY"
+ fi
;;
*)
@@ -101,11 +109,12 @@ fi
# Set JAVA_OPTS to be able to load native libraries and to set heap size
JAVA_OPTS="-XX:MaxPermSize=128m $OUR_JAVA_OPTS"
JAVA_OPTS="$JAVA_OPTS -Xms$OUR_JAVA_MEM -Xmx$OUR_JAVA_MEM"
+
# Load extra JAVA_OPTS from conf/java-opts, if it exists
if [ -e "$FWDIR/conf/java-opts" ] ; then
JAVA_OPTS="$JAVA_OPTS `cat $FWDIR/conf/java-opts`"
fi
-export JAVA_OPTS
+
# Attention: when changing the way the JAVA_OPTS are assembled, the change must be reflected in CommandUtils.scala!
TOOLS_DIR="$FWDIR"/tools
@@ -146,10 +155,28 @@ if $cygwin; then
fi
export CLASSPATH
-if [ "$SPARK_PRINT_LAUNCH_COMMAND" == "1" ]; then
- echo -n "Spark Command: " 1>&2
- echo "$RUNNER" -cp "$CLASSPATH" $JAVA_OPTS "$@" 1>&2
- echo -e "========================================\n" 1>&2
+# In Spark submit client mode, the driver is launched in the same JVM as Spark submit itself.
+# Here we must parse the properties file for relevant "spark.driver.*" configs before launching
+# the driver JVM itself. Instead of handling this complexity in Bash, we launch a separate JVM
+# to prepare the launch environment of this driver JVM.
+
+if [ -n "$SPARK_SUBMIT_BOOTSTRAP_DRIVER" ]; then
+ # This is used only if the properties file actually contains these special configs
+ # Export the environment variables needed by SparkSubmitDriverBootstrapper
+ export RUNNER
+ export CLASSPATH
+ export JAVA_OPTS
+ export OUR_JAVA_MEM
+ export SPARK_CLASS=1
+ shift # Ignore main class (org.apache.spark.deploy.SparkSubmit) and use our own
+ exec "$RUNNER" org.apache.spark.deploy.SparkSubmitDriverBootstrapper "$@"
+else
+ # Note: The format of this command is closely echoed in SparkSubmitDriverBootstrapper.scala
+ if [ -n "$SPARK_PRINT_LAUNCH_COMMAND" ]; then
+ echo -n "Spark Command: " 1>&2
+ echo "$RUNNER" -cp "$CLASSPATH" $JAVA_OPTS "$@" 1>&2
+ echo -e "========================================\n" 1>&2
+ fi
+ exec "$RUNNER" -cp "$CLASSPATH" $JAVA_OPTS "$@"
fi
-exec "$RUNNER" -cp "$CLASSPATH" $JAVA_OPTS "$@"
diff --git a/bin/spark-submit b/bin/spark-submit
index 9e7cecedd0..32c911cd04 100755
--- a/bin/spark-submit
+++ b/bin/spark-submit
@@ -17,14 +17,18 @@
# limitations under the License.
#
+# NOTE: Any changes in this file must be reflected in SparkClassLauncher.scala!
+
export SPARK_HOME="$(cd `dirname $0`/..; pwd)"
ORIG_ARGS=("$@")
while (($#)); do
if [ "$1" = "--deploy-mode" ]; then
- DEPLOY_MODE=$2
+ SPARK_SUBMIT_DEPLOY_MODE=$2
+ elif [ "$1" = "--properties-file" ]; then
+ SPARK_SUBMIT_PROPERTIES_FILE=$2
elif [ "$1" = "--driver-memory" ]; then
- DRIVER_MEMORY=$2
+ export SPARK_SUBMIT_DRIVER_MEMORY=$2
elif [ "$1" = "--driver-library-path" ]; then
export SPARK_SUBMIT_LIBRARY_PATH=$2
elif [ "$1" = "--driver-class-path" ]; then
@@ -35,10 +39,24 @@ while (($#)); do
shift
done
-DEPLOY_MODE=${DEPLOY_MODE:-"client"}
+DEFAULT_PROPERTIES_FILE="$SPARK_HOME/conf/spark-defaults.conf"
+export SPARK_SUBMIT_DEPLOY_MODE=${SPARK_SUBMIT_DEPLOY_MODE:-"client"}
+export SPARK_SUBMIT_PROPERTIES_FILE=${SPARK_SUBMIT_PROPERTIES_FILE:-"$DEFAULT_PROPERTIES_FILE"}
+
+# For client mode, the driver will be launched in the same JVM that launches
+# SparkSubmit, so we may need to read the properties file for any extra class
+# paths, library paths, java options and memory early on. Otherwise, it will
+# be too late by the time the driver JVM has started.
-if [ -n "$DRIVER_MEMORY" ] && [ $DEPLOY_MODE == "client" ]; then
- export SPARK_DRIVER_MEMORY=$DRIVER_MEMORY
+if [[ "$SPARK_SUBMIT_DEPLOY_MODE" == "client" && -f "$SPARK_SUBMIT_PROPERTIES_FILE" ]]; then
+ # Parse the properties file only if the special configs exist
+ contains_special_configs=$(
+ grep -e "spark.driver.extra*\|spark.driver.memory" "$SPARK_SUBMIT_PROPERTIES_FILE" | \
+ grep -v "^[[:space:]]*#"
+ )
+ if [ -n "$contains_special_configs" ]; then
+ export SPARK_SUBMIT_BOOTSTRAP_DRIVER=1
+ fi
fi
exec $SPARK_HOME/bin/spark-class org.apache.spark.deploy.SparkSubmit "${ORIG_ARGS[@]}"
diff --git a/bin/utils.sh b/bin/utils.sh
index 0804b1ed9f..0804b1ed9f 100644..100755
--- a/bin/utils.sh
+++ b/bin/utils.sh
diff --git a/conf/spark-defaults.conf.template b/conf/spark-defaults.conf.template
index 2779342769..94427029b9 100644
--- a/conf/spark-defaults.conf.template
+++ b/conf/spark-defaults.conf.template
@@ -2,7 +2,9 @@
# This is useful for setting default environmental settings.
# Example:
-# spark.master spark://master:7077
-# spark.eventLog.enabled true
-# spark.eventLog.dir hdfs://namenode:8021/directory
-# spark.serializer org.apache.spark.serializer.KryoSerializer
+# spark.master spark://master:7077
+# spark.eventLog.enabled true
+# spark.eventLog.dir hdfs://namenode:8021/directory
+# spark.serializer org.apache.spark.serializer.KryoSerializer
+# spark.driver.memory 5g
+# spark.executor.extraJavaOptions -XX:+PrintGCDetail -Dkey=value -Dnumbers="one two three"
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala b/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala
index 52c70712ee..be5ebfa921 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala
@@ -40,28 +40,3 @@ private[spark] object PythonUtils {
paths.filter(_ != "").mkString(File.pathSeparator)
}
}
-
-
-/**
- * A utility class to redirect the child process's stdout or stderr.
- */
-private[spark] class RedirectThread(
- in: InputStream,
- out: OutputStream,
- name: String)
- extends Thread(name) {
-
- setDaemon(true)
- override def run() {
- scala.util.control.Exception.ignoring(classOf[IOException]) {
- // FIXME: We copy the stream on the level of bytes to avoid encoding problems.
- val buf = new Array[Byte](1024)
- var len = in.read(buf)
- while (len != -1) {
- out.write(buf, 0, len)
- out.flush()
- len = in.read(buf)
- }
- }
- }
-}
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
index bf716a8ab0..4c4796f6c5 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
@@ -17,7 +17,6 @@
package org.apache.spark.api.python
-import java.lang.Runtime
import java.io.{DataOutputStream, DataInputStream, InputStream, OutputStreamWriter}
import java.net.{InetAddress, ServerSocket, Socket, SocketException}
@@ -25,7 +24,7 @@ import scala.collection.mutable
import scala.collection.JavaConversions._
import org.apache.spark._
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{RedirectThread, Utils}
private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String, String])
extends Logging {
diff --git a/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala b/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala
index 0d6751f3fa..b66c3ba4d5 100644
--- a/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala
@@ -22,8 +22,8 @@ import java.net.URI
import scala.collection.mutable.ArrayBuffer
import scala.collection.JavaConversions._
-import org.apache.spark.api.python.{PythonUtils, RedirectThread}
-import org.apache.spark.util.Utils
+import org.apache.spark.api.python.PythonUtils
+import org.apache.spark.util.{RedirectThread, Utils}
/**
* A main class used by spark-submit to launch Python applications. It executes python as a
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index 318509a67a..f8cdbc3c39 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -195,18 +195,21 @@ object SparkSubmit {
OptionAssigner(args.jars, YARN, CLUSTER, clOption = "--addJars"),
// Other options
- OptionAssigner(args.driverExtraClassPath, STANDALONE | YARN, CLUSTER,
- sysProp = "spark.driver.extraClassPath"),
- OptionAssigner(args.driverExtraJavaOptions, STANDALONE | YARN, CLUSTER,
- sysProp = "spark.driver.extraJavaOptions"),
- OptionAssigner(args.driverExtraLibraryPath, STANDALONE | YARN, CLUSTER,
- sysProp = "spark.driver.extraLibraryPath"),
OptionAssigner(args.executorMemory, STANDALONE | MESOS | YARN, ALL_DEPLOY_MODES,
sysProp = "spark.executor.memory"),
OptionAssigner(args.totalExecutorCores, STANDALONE | MESOS, ALL_DEPLOY_MODES,
sysProp = "spark.cores.max"),
OptionAssigner(args.files, LOCAL | STANDALONE | MESOS, ALL_DEPLOY_MODES,
- sysProp = "spark.files")
+ sysProp = "spark.files"),
+
+ // Only process driver specific options for cluster mode here,
+ // because they have already been processed in bash for client mode
+ OptionAssigner(args.driverExtraClassPath, STANDALONE | YARN, CLUSTER,
+ sysProp = "spark.driver.extraClassPath"),
+ OptionAssigner(args.driverExtraJavaOptions, STANDALONE | YARN, CLUSTER,
+ sysProp = "spark.driver.extraJavaOptions"),
+ OptionAssigner(args.driverExtraLibraryPath, STANDALONE | YARN, CLUSTER,
+ sysProp = "spark.driver.extraLibraryPath")
)
// In client mode, launch the application main class directly
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala
new file mode 100644
index 0000000000..af607e6a4a
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy
+
+import java.io.File
+
+import scala.collection.JavaConversions._
+
+import org.apache.spark.util.{RedirectThread, Utils}
+
+/**
+ * Launch an application through Spark submit in client mode with the appropriate classpath,
+ * library paths, java options and memory. These properties of the JVM must be set before the
+ * driver JVM is launched. The sole purpose of this class is to avoid handling the complexity
+ * of parsing the properties file for such relevant configs in Bash.
+ *
+ * Usage: org.apache.spark.deploy.SparkSubmitDriverBootstrapper <submit args>
+ */
+private[spark] object SparkSubmitDriverBootstrapper {
+
+ // Note: This class depends on the behavior of `bin/spark-class` and `bin/spark-submit`.
+ // Any changes made there must be reflected in this file.
+
+ def main(args: Array[String]): Unit = {
+
+ // This should be called only from `bin/spark-class`
+ if (!sys.env.contains("SPARK_CLASS")) {
+ System.err.println("SparkSubmitDriverBootstrapper must be called from `bin/spark-class`!")
+ System.exit(1)
+ }
+
+ val submitArgs = args
+ val runner = sys.env("RUNNER")
+ val classpath = sys.env("CLASSPATH")
+ val javaOpts = sys.env("JAVA_OPTS")
+ val defaultDriverMemory = sys.env("OUR_JAVA_MEM")
+
+ // Spark submit specific environment variables
+ val deployMode = sys.env("SPARK_SUBMIT_DEPLOY_MODE")
+ val propertiesFile = sys.env("SPARK_SUBMIT_PROPERTIES_FILE")
+ val bootstrapDriver = sys.env("SPARK_SUBMIT_BOOTSTRAP_DRIVER")
+ val submitDriverMemory = sys.env.get("SPARK_SUBMIT_DRIVER_MEMORY")
+ val submitLibraryPath = sys.env.get("SPARK_SUBMIT_LIBRARY_PATH")
+ val submitClasspath = sys.env.get("SPARK_SUBMIT_CLASSPATH")
+ val submitJavaOpts = sys.env.get("SPARK_SUBMIT_OPTS")
+
+ assume(runner != null, "RUNNER must be set")
+ assume(classpath != null, "CLASSPATH must be set")
+ assume(javaOpts != null, "JAVA_OPTS must be set")
+ assume(defaultDriverMemory != null, "OUR_JAVA_MEM must be set")
+ assume(deployMode == "client", "SPARK_SUBMIT_DEPLOY_MODE must be \"client\"!")
+ assume(propertiesFile != null, "SPARK_SUBMIT_PROPERTIES_FILE must be set")
+ assume(bootstrapDriver != null, "SPARK_SUBMIT_BOOTSTRAP_DRIVER must be set")
+
+ // Parse the properties file for the equivalent spark.driver.* configs
+ val properties = SparkSubmitArguments.getPropertiesFromFile(new File(propertiesFile)).toMap
+ val confDriverMemory = properties.get("spark.driver.memory")
+ val confLibraryPath = properties.get("spark.driver.extraLibraryPath")
+ val confClasspath = properties.get("spark.driver.extraClassPath")
+ val confJavaOpts = properties.get("spark.driver.extraJavaOptions")
+
+ // Favor Spark submit arguments over the equivalent configs in the properties file.
+ // Note that we do not actually use the Spark submit values for library path, classpath,
+ // and Java opts here, because we have already captured them in Bash.
+
+ val newDriverMemory = submitDriverMemory
+ .orElse(confDriverMemory)
+ .getOrElse(defaultDriverMemory)
+
+ val newLibraryPath =
+ if (submitLibraryPath.isDefined) {
+ // SPARK_SUBMIT_LIBRARY_PATH is already captured in JAVA_OPTS
+ ""
+ } else {
+ confLibraryPath.map("-Djava.library.path=" + _).getOrElse("")
+ }
+
+ val newClasspath =
+ if (submitClasspath.isDefined) {
+ // SPARK_SUBMIT_CLASSPATH is already captured in CLASSPATH
+ classpath
+ } else {
+ classpath + confClasspath.map(sys.props("path.separator") + _).getOrElse("")
+ }
+
+ val newJavaOpts =
+ if (submitJavaOpts.isDefined) {
+ // SPARK_SUBMIT_OPTS is already captured in JAVA_OPTS
+ javaOpts
+ } else {
+ javaOpts + confJavaOpts.map(" " + _).getOrElse("")
+ }
+
+ val filteredJavaOpts = Utils.splitCommandString(newJavaOpts)
+ .filterNot(_.startsWith("-Xms"))
+ .filterNot(_.startsWith("-Xmx"))
+
+ // Build up command
+ val command: Seq[String] =
+ Seq(runner) ++
+ Seq("-cp", newClasspath) ++
+ Seq(newLibraryPath) ++
+ filteredJavaOpts ++
+ Seq(s"-Xms$newDriverMemory", s"-Xmx$newDriverMemory") ++
+ Seq("org.apache.spark.deploy.SparkSubmit") ++
+ submitArgs
+
+ // Print the launch command. This follows closely the format used in `bin/spark-class`.
+ if (sys.env.contains("SPARK_PRINT_LAUNCH_COMMAND")) {
+ System.err.print("Spark Command: ")
+ System.err.println(command.mkString(" "))
+ System.err.println("========================================\n")
+ }
+
+ // Start the driver JVM
+ val filteredCommand = command.filter(_.nonEmpty)
+ val builder = new ProcessBuilder(filteredCommand)
+ val process = builder.start()
+
+ // Redirect stdin, stdout, and stderr to/from the child JVM
+ val stdinThread = new RedirectThread(System.in, process.getOutputStream, "redirect stdin")
+ val stdoutThread = new RedirectThread(process.getInputStream, System.out, "redirect stdout")
+ val stderrThread = new RedirectThread(process.getErrorStream, System.err, "redirect stderr")
+ stdinThread.start()
+ stdoutThread.start()
+ stderrThread.start()
+
+ // Terminate on broken pipe, which signals that the parent process has exited. This is
+ // important for the PySpark shell, where Spark submit itself is a python subprocess.
+ stdinThread.join()
+ process.destroy()
+ }
+
+}
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index d6d74ce269..69a84a3604 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -1480,3 +1480,24 @@ private[spark] object Utils extends Logging {
}
}
+
+/**
+ * A utility class to redirect the child process's stdout or stderr.
+ */
+private[spark] class RedirectThread(in: InputStream, out: OutputStream, name: String)
+ extends Thread(name) {
+
+ setDaemon(true)
+ override def run() {
+ scala.util.control.Exception.ignoring(classOf[IOException]) {
+ // FIXME: We copy the stream on the level of bytes to avoid encoding problems.
+ val buf = new Array[Byte](1024)
+ var len = in.read(buf)
+ while (len != -1) {
+ out.write(buf, 0, len)
+ out.flush()
+ len = in.read(buf)
+ }
+ }
+ }
+}