aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorAndrew Or <andrewor14@gmail.com>2014-08-20 15:01:47 -0700
committerPatrick Wendell <pwendell@gmail.com>2014-08-20 15:01:47 -0700
commitb3ec51bfd795772ff96d18228e979a52ebc82ec4 (patch)
tree87224252fd2df7b553bd1a824b58b8d26795b600 /core
parentc1ba4cd6b4db22a9325eee50dc40a78593a10de1 (diff)
downloadspark-b3ec51bfd795772ff96d18228e979a52ebc82ec4.tar.gz
spark-b3ec51bfd795772ff96d18228e979a52ebc82ec4.tar.bz2
spark-b3ec51bfd795772ff96d18228e979a52ebc82ec4.zip
[SPARK-2849] Handle driver configs separately in client mode
In client deploy mode, the driver is launched from within `SparkSubmit`'s JVM. This means by the time we parse Spark configs from `spark-defaults.conf`, it is already too late to control certain properties of the driver's JVM. We currently ignore these configs in client mode altogether. ``` spark.driver.memory spark.driver.extraJavaOptions spark.driver.extraClassPath spark.driver.extraLibraryPath ``` This PR handles these properties before launching the driver JVM. It achieves this by spawning a separate JVM that runs a new class called `SparkSubmitDriverBootstrapper`, which spawns `SparkSubmit` as a sub-process with the appropriate classpath, library paths, java opts and memory. Author: Andrew Or <andrewor14@gmail.com> Closes #1845 from andrewor14/handle-configs-bash and squashes the following commits: bed4bdf [Andrew Or] Change a few comments / messages (minor) 24dba60 [Andrew Or] Merge branch 'master' of github.com:apache/spark into handle-configs-bash 08fd788 [Andrew Or] Warn against external usages of SparkSubmitDriverBootstrapper ff34728 [Andrew Or] Minor comments 51aeb01 [Andrew Or] Filter out JVM memory in Scala rather than Bash (minor) 9a778f6 [Andrew Or] Fix PySpark: actually kill driver on termination d0f20db [Andrew Or] Don't pass empty library paths, classpath, java opts etc. a78cb26 [Andrew Or] Revert a few changes in utils.sh (minor) 9ba37e2 [Andrew Or] Don't barf when the properties file does not exist 8867a09 [Andrew Or] A few more naming things (minor) 19464ad [Andrew Or] SPARK_SUBMIT_JAVA_OPTS -> SPARK_SUBMIT_OPTS d6488f9 [Andrew Or] Merge branch 'master' of github.com:apache/spark into handle-configs-bash 1ea6bbe [Andrew Or] SparkClassLauncher -> SparkSubmitDriverBootstrapper a91ea19 [Andrew Or] Fix precedence of library paths, classpath, java opts and memory 158f813 [Andrew Or] Remove "client mode" boolean argument c84f5c8 [Andrew Or] Remove debug print statement (minor) b71f52b [Andrew Or] Revert a few more changes (minor) 7d94a8d [Andrew Or] Merge branch 'master' of github.com:apache/spark into handle-configs-bash 3a8235d [Andrew Or] Only parse the properties file if special configs exist c37e08d [Andrew Or] Revert a few more changes a396eda [Andrew Or] Nullify my own hard work to simplify bash 0effa1e [Andrew Or] Add code in Scala that handles special configs c886568 [Andrew Or] Fix lines too long + a few comments / style (minor) 7a4190a [Andrew Or] Merge branch 'master' of github.com:apache/spark into handle-configs-bash 7396be2 [Andrew Or] Explicitly comment that multi-line properties are not supported fa11ef8 [Andrew Or] Parse the properties file only if the special configs exist 371cac4 [Andrew Or] Add function prefix (minor) be99eb3 [Andrew Or] Fix tests to not include multi-line configs bd0d468 [Andrew Or] Simplify parsing config file by ignoring multi-line arguments 56ac247 [Andrew Or] Use eval and set to simplify splitting 8d4614c [Andrew Or] Merge branch 'master' of github.com:apache/spark into handle-configs-bash aeb79c7 [Andrew Or] Merge branch 'master' of github.com:apache/spark into handle-configs-bash 2732ac0 [Andrew Or] Integrate BASH tests into dev/run-tests + log error properly 8d26a5c [Andrew Or] Add tests for bash/utils.sh 4ae24c3 [Andrew Or] Fix bug: escape properly in quote_java_property b3c4cd5 [Andrew Or] Fix bug: count the number of quotes instead of detecting presence c2273fc [Andrew Or] Fix typo (minor) e793e5f [Andrew Or] Handle multi-line arguments 5d8f8c4 [Andrew Or] Merge branch 'master' of github.com:apache/spark into submit-driver-extra c7b9926 [Andrew Or] Minor changes to spark-defaults.conf.template a992ae2 [Andrew Or] Escape spark.*.extraJavaOptions correctly aabfc7e [Andrew Or] escape -> split (minor) 45a1eb9 [Andrew Or] Fix bug: escape escaped backslashes and quotes properly... 1cdc6b1 [Andrew Or] Fix bug: escape escaped double quotes properly c854859 [Andrew Or] Add small comment c13a2cb [Andrew Or] Merge branch 'master' of github.com:apache/spark into submit-driver-extra 8e552b7 [Andrew Or] Include an example of spark.*.extraJavaOptions de765c9 [Andrew Or] Print spark-class command properly a4df3c4 [Andrew Or] Move parsing and escaping logic to utils.sh dec2343 [Andrew Or] Only export variables if they exist fa2136e [Andrew Or] Escape Java options + parse java properties files properly ef12f74 [Andrew Or] Minor formatting 4ec22a1 [Andrew Or] Merge branch 'master' of github.com:apache/spark into submit-driver-extra e5cfb46 [Andrew Or] Collapse duplicate code + fix potential whitespace issues 4edcaa8 [Andrew Or] Redirect stdout to stderr for python 130f295 [Andrew Or] Handle spark.driver.memory too 98dd8e3 [Andrew Or] Add warning if properties file does not exist 8843562 [Andrew Or] Fix compilation issues... 75ee6b4 [Andrew Or] Remove accidentally added file 63ed2e9 [Andrew Or] Merge branch 'master' of github.com:apache/spark into submit-driver-extra 0025474 [Andrew Or] Revert SparkSubmit handling of --driver-* options for only cluster mode a2ab1b0 [Andrew Or] Parse spark.driver.extra* in bash 250cb95 [Andrew Or] Do not ignore spark.driver.extra* for client mode
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala25
-rw-r--r--core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala17
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala149
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala21
6 files changed, 183 insertions, 36 deletions
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala b/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala
index 52c70712ee..be5ebfa921 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala
@@ -40,28 +40,3 @@ private[spark] object PythonUtils {
paths.filter(_ != "").mkString(File.pathSeparator)
}
}
-
-
-/**
- * A utility class to redirect the child process's stdout or stderr.
- */
-private[spark] class RedirectThread(
- in: InputStream,
- out: OutputStream,
- name: String)
- extends Thread(name) {
-
- setDaemon(true)
- override def run() {
- scala.util.control.Exception.ignoring(classOf[IOException]) {
- // FIXME: We copy the stream on the level of bytes to avoid encoding problems.
- val buf = new Array[Byte](1024)
- var len = in.read(buf)
- while (len != -1) {
- out.write(buf, 0, len)
- out.flush()
- len = in.read(buf)
- }
- }
- }
-}
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
index bf716a8ab0..4c4796f6c5 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
@@ -17,7 +17,6 @@
package org.apache.spark.api.python
-import java.lang.Runtime
import java.io.{DataOutputStream, DataInputStream, InputStream, OutputStreamWriter}
import java.net.{InetAddress, ServerSocket, Socket, SocketException}
@@ -25,7 +24,7 @@ import scala.collection.mutable
import scala.collection.JavaConversions._
import org.apache.spark._
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{RedirectThread, Utils}
private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String, String])
extends Logging {
diff --git a/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala b/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala
index 0d6751f3fa..b66c3ba4d5 100644
--- a/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala
@@ -22,8 +22,8 @@ import java.net.URI
import scala.collection.mutable.ArrayBuffer
import scala.collection.JavaConversions._
-import org.apache.spark.api.python.{PythonUtils, RedirectThread}
-import org.apache.spark.util.Utils
+import org.apache.spark.api.python.PythonUtils
+import org.apache.spark.util.{RedirectThread, Utils}
/**
* A main class used by spark-submit to launch Python applications. It executes python as a
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index 318509a67a..f8cdbc3c39 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -195,18 +195,21 @@ object SparkSubmit {
OptionAssigner(args.jars, YARN, CLUSTER, clOption = "--addJars"),
// Other options
- OptionAssigner(args.driverExtraClassPath, STANDALONE | YARN, CLUSTER,
- sysProp = "spark.driver.extraClassPath"),
- OptionAssigner(args.driverExtraJavaOptions, STANDALONE | YARN, CLUSTER,
- sysProp = "spark.driver.extraJavaOptions"),
- OptionAssigner(args.driverExtraLibraryPath, STANDALONE | YARN, CLUSTER,
- sysProp = "spark.driver.extraLibraryPath"),
OptionAssigner(args.executorMemory, STANDALONE | MESOS | YARN, ALL_DEPLOY_MODES,
sysProp = "spark.executor.memory"),
OptionAssigner(args.totalExecutorCores, STANDALONE | MESOS, ALL_DEPLOY_MODES,
sysProp = "spark.cores.max"),
OptionAssigner(args.files, LOCAL | STANDALONE | MESOS, ALL_DEPLOY_MODES,
- sysProp = "spark.files")
+ sysProp = "spark.files"),
+
+ // Only process driver specific options for cluster mode here,
+ // because they have already been processed in bash for client mode
+ OptionAssigner(args.driverExtraClassPath, STANDALONE | YARN, CLUSTER,
+ sysProp = "spark.driver.extraClassPath"),
+ OptionAssigner(args.driverExtraJavaOptions, STANDALONE | YARN, CLUSTER,
+ sysProp = "spark.driver.extraJavaOptions"),
+ OptionAssigner(args.driverExtraLibraryPath, STANDALONE | YARN, CLUSTER,
+ sysProp = "spark.driver.extraLibraryPath")
)
// In client mode, launch the application main class directly
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala
new file mode 100644
index 0000000000..af607e6a4a
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy
+
+import java.io.File
+
+import scala.collection.JavaConversions._
+
+import org.apache.spark.util.{RedirectThread, Utils}
+
+/**
+ * Launch an application through Spark submit in client mode with the appropriate classpath,
+ * library paths, java options and memory. These properties of the JVM must be set before the
+ * driver JVM is launched. The sole purpose of this class is to avoid handling the complexity
+ * of parsing the properties file for such relevant configs in Bash.
+ *
+ * Usage: org.apache.spark.deploy.SparkSubmitDriverBootstrapper <submit args>
+ */
+private[spark] object SparkSubmitDriverBootstrapper {
+
+ // Note: This class depends on the behavior of `bin/spark-class` and `bin/spark-submit`.
+ // Any changes made there must be reflected in this file.
+
+ def main(args: Array[String]): Unit = {
+
+ // This should be called only from `bin/spark-class`
+ if (!sys.env.contains("SPARK_CLASS")) {
+ System.err.println("SparkSubmitDriverBootstrapper must be called from `bin/spark-class`!")
+ System.exit(1)
+ }
+
+ val submitArgs = args
+ val runner = sys.env("RUNNER")
+ val classpath = sys.env("CLASSPATH")
+ val javaOpts = sys.env("JAVA_OPTS")
+ val defaultDriverMemory = sys.env("OUR_JAVA_MEM")
+
+ // Spark submit specific environment variables
+ val deployMode = sys.env("SPARK_SUBMIT_DEPLOY_MODE")
+ val propertiesFile = sys.env("SPARK_SUBMIT_PROPERTIES_FILE")
+ val bootstrapDriver = sys.env("SPARK_SUBMIT_BOOTSTRAP_DRIVER")
+ val submitDriverMemory = sys.env.get("SPARK_SUBMIT_DRIVER_MEMORY")
+ val submitLibraryPath = sys.env.get("SPARK_SUBMIT_LIBRARY_PATH")
+ val submitClasspath = sys.env.get("SPARK_SUBMIT_CLASSPATH")
+ val submitJavaOpts = sys.env.get("SPARK_SUBMIT_OPTS")
+
+ assume(runner != null, "RUNNER must be set")
+ assume(classpath != null, "CLASSPATH must be set")
+ assume(javaOpts != null, "JAVA_OPTS must be set")
+ assume(defaultDriverMemory != null, "OUR_JAVA_MEM must be set")
+ assume(deployMode == "client", "SPARK_SUBMIT_DEPLOY_MODE must be \"client\"!")
+ assume(propertiesFile != null, "SPARK_SUBMIT_PROPERTIES_FILE must be set")
+ assume(bootstrapDriver != null, "SPARK_SUBMIT_BOOTSTRAP_DRIVER must be set")
+
+ // Parse the properties file for the equivalent spark.driver.* configs
+ val properties = SparkSubmitArguments.getPropertiesFromFile(new File(propertiesFile)).toMap
+ val confDriverMemory = properties.get("spark.driver.memory")
+ val confLibraryPath = properties.get("spark.driver.extraLibraryPath")
+ val confClasspath = properties.get("spark.driver.extraClassPath")
+ val confJavaOpts = properties.get("spark.driver.extraJavaOptions")
+
+ // Favor Spark submit arguments over the equivalent configs in the properties file.
+ // Note that we do not actually use the Spark submit values for library path, classpath,
+ // and Java opts here, because we have already captured them in Bash.
+
+ val newDriverMemory = submitDriverMemory
+ .orElse(confDriverMemory)
+ .getOrElse(defaultDriverMemory)
+
+ val newLibraryPath =
+ if (submitLibraryPath.isDefined) {
+ // SPARK_SUBMIT_LIBRARY_PATH is already captured in JAVA_OPTS
+ ""
+ } else {
+ confLibraryPath.map("-Djava.library.path=" + _).getOrElse("")
+ }
+
+ val newClasspath =
+ if (submitClasspath.isDefined) {
+ // SPARK_SUBMIT_CLASSPATH is already captured in CLASSPATH
+ classpath
+ } else {
+ classpath + confClasspath.map(sys.props("path.separator") + _).getOrElse("")
+ }
+
+ val newJavaOpts =
+ if (submitJavaOpts.isDefined) {
+ // SPARK_SUBMIT_OPTS is already captured in JAVA_OPTS
+ javaOpts
+ } else {
+ javaOpts + confJavaOpts.map(" " + _).getOrElse("")
+ }
+
+ val filteredJavaOpts = Utils.splitCommandString(newJavaOpts)
+ .filterNot(_.startsWith("-Xms"))
+ .filterNot(_.startsWith("-Xmx"))
+
+ // Build up command
+ val command: Seq[String] =
+ Seq(runner) ++
+ Seq("-cp", newClasspath) ++
+ Seq(newLibraryPath) ++
+ filteredJavaOpts ++
+ Seq(s"-Xms$newDriverMemory", s"-Xmx$newDriverMemory") ++
+ Seq("org.apache.spark.deploy.SparkSubmit") ++
+ submitArgs
+
+ // Print the launch command. This follows closely the format used in `bin/spark-class`.
+ if (sys.env.contains("SPARK_PRINT_LAUNCH_COMMAND")) {
+ System.err.print("Spark Command: ")
+ System.err.println(command.mkString(" "))
+ System.err.println("========================================\n")
+ }
+
+ // Start the driver JVM
+ val filteredCommand = command.filter(_.nonEmpty)
+ val builder = new ProcessBuilder(filteredCommand)
+ val process = builder.start()
+
+ // Redirect stdin, stdout, and stderr to/from the child JVM
+ val stdinThread = new RedirectThread(System.in, process.getOutputStream, "redirect stdin")
+ val stdoutThread = new RedirectThread(process.getInputStream, System.out, "redirect stdout")
+ val stderrThread = new RedirectThread(process.getErrorStream, System.err, "redirect stderr")
+ stdinThread.start()
+ stdoutThread.start()
+ stderrThread.start()
+
+ // Terminate on broken pipe, which signals that the parent process has exited. This is
+ // important for the PySpark shell, where Spark submit itself is a python subprocess.
+ stdinThread.join()
+ process.destroy()
+ }
+
+}
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index d6d74ce269..69a84a3604 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -1480,3 +1480,24 @@ private[spark] object Utils extends Logging {
}
}
+
+/**
+ * A utility class to redirect the child process's stdout or stderr.
+ */
+private[spark] class RedirectThread(in: InputStream, out: OutputStream, name: String)
+ extends Thread(name) {
+
+ setDaemon(true)
+ override def run() {
+ scala.util.control.Exception.ignoring(classOf[IOException]) {
+ // FIXME: We copy the stream on the level of bytes to avoid encoding problems.
+ val buf = new Array[Byte](1024)
+ var len = in.read(buf)
+ while (len != -1) {
+ out.write(buf, 0, len)
+ out.flush()
+ len = in.read(buf)
+ }
+ }
+ }
+}