aboutsummaryrefslogtreecommitdiff
path: root/core/src
diff options
context:
space:
mode:
Diffstat (limited to 'core/src')
-rw-r--r--core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala25
-rw-r--r--core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala17
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala149
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala21
6 files changed, 183 insertions, 36 deletions
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala b/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala
index 52c70712ee..be5ebfa921 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala
@@ -40,28 +40,3 @@ private[spark] object PythonUtils {
paths.filter(_ != "").mkString(File.pathSeparator)
}
}
-
-
-/**
- * A utility class to redirect the child process's stdout or stderr.
- */
-private[spark] class RedirectThread(
- in: InputStream,
- out: OutputStream,
- name: String)
- extends Thread(name) {
-
- setDaemon(true)
- override def run() {
- scala.util.control.Exception.ignoring(classOf[IOException]) {
- // FIXME: We copy the stream on the level of bytes to avoid encoding problems.
- val buf = new Array[Byte](1024)
- var len = in.read(buf)
- while (len != -1) {
- out.write(buf, 0, len)
- out.flush()
- len = in.read(buf)
- }
- }
- }
-}
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
index bf716a8ab0..4c4796f6c5 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
@@ -17,7 +17,6 @@
package org.apache.spark.api.python
-import java.lang.Runtime
import java.io.{DataOutputStream, DataInputStream, InputStream, OutputStreamWriter}
import java.net.{InetAddress, ServerSocket, Socket, SocketException}
@@ -25,7 +24,7 @@ import scala.collection.mutable
import scala.collection.JavaConversions._
import org.apache.spark._
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{RedirectThread, Utils}
private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String, String])
extends Logging {
diff --git a/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala b/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala
index 0d6751f3fa..b66c3ba4d5 100644
--- a/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala
@@ -22,8 +22,8 @@ import java.net.URI
import scala.collection.mutable.ArrayBuffer
import scala.collection.JavaConversions._
-import org.apache.spark.api.python.{PythonUtils, RedirectThread}
-import org.apache.spark.util.Utils
+import org.apache.spark.api.python.PythonUtils
+import org.apache.spark.util.{RedirectThread, Utils}
/**
* A main class used by spark-submit to launch Python applications. It executes python as a
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index 318509a67a..f8cdbc3c39 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -195,18 +195,21 @@ object SparkSubmit {
OptionAssigner(args.jars, YARN, CLUSTER, clOption = "--addJars"),
// Other options
- OptionAssigner(args.driverExtraClassPath, STANDALONE | YARN, CLUSTER,
- sysProp = "spark.driver.extraClassPath"),
- OptionAssigner(args.driverExtraJavaOptions, STANDALONE | YARN, CLUSTER,
- sysProp = "spark.driver.extraJavaOptions"),
- OptionAssigner(args.driverExtraLibraryPath, STANDALONE | YARN, CLUSTER,
- sysProp = "spark.driver.extraLibraryPath"),
OptionAssigner(args.executorMemory, STANDALONE | MESOS | YARN, ALL_DEPLOY_MODES,
sysProp = "spark.executor.memory"),
OptionAssigner(args.totalExecutorCores, STANDALONE | MESOS, ALL_DEPLOY_MODES,
sysProp = "spark.cores.max"),
OptionAssigner(args.files, LOCAL | STANDALONE | MESOS, ALL_DEPLOY_MODES,
- sysProp = "spark.files")
+ sysProp = "spark.files"),
+
+ // Only process driver specific options for cluster mode here,
+ // because they have already been processed in bash for client mode
+ OptionAssigner(args.driverExtraClassPath, STANDALONE | YARN, CLUSTER,
+ sysProp = "spark.driver.extraClassPath"),
+ OptionAssigner(args.driverExtraJavaOptions, STANDALONE | YARN, CLUSTER,
+ sysProp = "spark.driver.extraJavaOptions"),
+ OptionAssigner(args.driverExtraLibraryPath, STANDALONE | YARN, CLUSTER,
+ sysProp = "spark.driver.extraLibraryPath")
)
// In client mode, launch the application main class directly
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala
new file mode 100644
index 0000000000..af607e6a4a
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy
+
+import java.io.File
+
+import scala.collection.JavaConversions._
+
+import org.apache.spark.util.{RedirectThread, Utils}
+
+/**
+ * Launch an application through Spark submit in client mode with the appropriate classpath,
+ * library paths, java options and memory. These properties of the JVM must be set before the
+ * driver JVM is launched. The sole purpose of this class is to avoid handling the complexity
+ * of parsing the properties file for such relevant configs in Bash.
+ *
+ * Usage: org.apache.spark.deploy.SparkSubmitDriverBootstrapper <submit args>
+ */
+private[spark] object SparkSubmitDriverBootstrapper {
+
+ // Note: This class depends on the behavior of `bin/spark-class` and `bin/spark-submit`.
+ // Any changes made there must be reflected in this file.
+
+ def main(args: Array[String]): Unit = {
+
+ // This should be called only from `bin/spark-class`
+ if (!sys.env.contains("SPARK_CLASS")) {
+ System.err.println("SparkSubmitDriverBootstrapper must be called from `bin/spark-class`!")
+ System.exit(1)
+ }
+
+ val submitArgs = args
+ val runner = sys.env("RUNNER")
+ val classpath = sys.env("CLASSPATH")
+ val javaOpts = sys.env("JAVA_OPTS")
+ val defaultDriverMemory = sys.env("OUR_JAVA_MEM")
+
+ // Spark submit specific environment variables
+ val deployMode = sys.env("SPARK_SUBMIT_DEPLOY_MODE")
+ val propertiesFile = sys.env("SPARK_SUBMIT_PROPERTIES_FILE")
+ val bootstrapDriver = sys.env("SPARK_SUBMIT_BOOTSTRAP_DRIVER")
+ val submitDriverMemory = sys.env.get("SPARK_SUBMIT_DRIVER_MEMORY")
+ val submitLibraryPath = sys.env.get("SPARK_SUBMIT_LIBRARY_PATH")
+ val submitClasspath = sys.env.get("SPARK_SUBMIT_CLASSPATH")
+ val submitJavaOpts = sys.env.get("SPARK_SUBMIT_OPTS")
+
+ assume(runner != null, "RUNNER must be set")
+ assume(classpath != null, "CLASSPATH must be set")
+ assume(javaOpts != null, "JAVA_OPTS must be set")
+ assume(defaultDriverMemory != null, "OUR_JAVA_MEM must be set")
+ assume(deployMode == "client", "SPARK_SUBMIT_DEPLOY_MODE must be \"client\"!")
+ assume(propertiesFile != null, "SPARK_SUBMIT_PROPERTIES_FILE must be set")
+ assume(bootstrapDriver != null, "SPARK_SUBMIT_BOOTSTRAP_DRIVER must be set")
+
+ // Parse the properties file for the equivalent spark.driver.* configs
+ val properties = SparkSubmitArguments.getPropertiesFromFile(new File(propertiesFile)).toMap
+ val confDriverMemory = properties.get("spark.driver.memory")
+ val confLibraryPath = properties.get("spark.driver.extraLibraryPath")
+ val confClasspath = properties.get("spark.driver.extraClassPath")
+ val confJavaOpts = properties.get("spark.driver.extraJavaOptions")
+
+ // Favor Spark submit arguments over the equivalent configs in the properties file.
+ // Note that we do not actually use the Spark submit values for library path, classpath,
+ // and Java opts here, because we have already captured them in Bash.
+
+ val newDriverMemory = submitDriverMemory
+ .orElse(confDriverMemory)
+ .getOrElse(defaultDriverMemory)
+
+ val newLibraryPath =
+ if (submitLibraryPath.isDefined) {
+ // SPARK_SUBMIT_LIBRARY_PATH is already captured in JAVA_OPTS
+ ""
+ } else {
+ confLibraryPath.map("-Djava.library.path=" + _).getOrElse("")
+ }
+
+ val newClasspath =
+ if (submitClasspath.isDefined) {
+ // SPARK_SUBMIT_CLASSPATH is already captured in CLASSPATH
+ classpath
+ } else {
+ classpath + confClasspath.map(sys.props("path.separator") + _).getOrElse("")
+ }
+
+ val newJavaOpts =
+ if (submitJavaOpts.isDefined) {
+ // SPARK_SUBMIT_OPTS is already captured in JAVA_OPTS
+ javaOpts
+ } else {
+ javaOpts + confJavaOpts.map(" " + _).getOrElse("")
+ }
+
+ val filteredJavaOpts = Utils.splitCommandString(newJavaOpts)
+ .filterNot(_.startsWith("-Xms"))
+ .filterNot(_.startsWith("-Xmx"))
+
+ // Build up command
+ val command: Seq[String] =
+ Seq(runner) ++
+ Seq("-cp", newClasspath) ++
+ Seq(newLibraryPath) ++
+ filteredJavaOpts ++
+ Seq(s"-Xms$newDriverMemory", s"-Xmx$newDriverMemory") ++
+ Seq("org.apache.spark.deploy.SparkSubmit") ++
+ submitArgs
+
+ // Print the launch command. This follows closely the format used in `bin/spark-class`.
+ if (sys.env.contains("SPARK_PRINT_LAUNCH_COMMAND")) {
+ System.err.print("Spark Command: ")
+ System.err.println(command.mkString(" "))
+ System.err.println("========================================\n")
+ }
+
+ // Start the driver JVM
+ val filteredCommand = command.filter(_.nonEmpty)
+ val builder = new ProcessBuilder(filteredCommand)
+ val process = builder.start()
+
+ // Redirect stdin, stdout, and stderr to/from the child JVM
+ val stdinThread = new RedirectThread(System.in, process.getOutputStream, "redirect stdin")
+ val stdoutThread = new RedirectThread(process.getInputStream, System.out, "redirect stdout")
+ val stderrThread = new RedirectThread(process.getErrorStream, System.err, "redirect stderr")
+ stdinThread.start()
+ stdoutThread.start()
+ stderrThread.start()
+
+ // Terminate on broken pipe, which signals that the parent process has exited. This is
+ // important for the PySpark shell, where Spark submit itself is a python subprocess.
+ stdinThread.join()
+ process.destroy()
+ }
+
+}
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index d6d74ce269..69a84a3604 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -1480,3 +1480,24 @@ private[spark] object Utils extends Logging {
}
}
+
+/**
+ * A utility class to redirect the child process's stdout or stderr.
+ */
+private[spark] class RedirectThread(in: InputStream, out: OutputStream, name: String)
+ extends Thread(name) {
+
+ setDaemon(true)
+ override def run() {
+ scala.util.control.Exception.ignoring(classOf[IOException]) {
+ // FIXME: We copy the stream on the level of bytes to avoid encoding problems.
+ val buf = new Array[Byte](1024)
+ var len = in.read(buf)
+ while (len != -1) {
+ out.write(buf, 0, len)
+ out.flush()
+ len = in.read(buf)
+ }
+ }
+ }
+}