diff options
author | Andrew Or <andrewor14@gmail.com> | 2014-08-27 23:03:46 -0700 |
---|---|---|
committer | Patrick Wendell <pwendell@gmail.com> | 2014-08-27 23:03:46 -0700 |
commit | dafe343499bbc688e266106e4bb897f9e619834e (patch) | |
tree | 346f636c4305ea503e5214d1abf7436a1fe271fa /core | |
parent | f38fab97c7970168f1bd81d4dc202e36322c95e3 (diff) | |
download | spark-dafe343499bbc688e266106e4bb897f9e619834e.tar.gz spark-dafe343499bbc688e266106e4bb897f9e619834e.tar.bz2 spark-dafe343499bbc688e266106e4bb897f9e619834e.zip |
[HOTFIX] Wait for EOF only for the PySpark shell
In `SparkSubmitDriverBootstrapper`, we wait for the parent process to send us an `EOF` before finishing the application. This is applicable for the PySpark shell because we terminate the application the same way. However if we run a python application, for instance, the JVM actually never exits unless it receives a manual EOF from the user. This is causing a few tests to timeout.
We only need to do this for the PySpark shell because Spark submit runs as a python subprocess only in this case. Thus, the normal Spark shell doesn't need to go through this case even though it is also a REPL.
Thanks davies for reporting this.
Author: Andrew Or <andrewor14@gmail.com>
Closes #2170 from andrewor14/bootstrap-hotfix and squashes the following commits:
42963f5 [Andrew Or] Do not wait for EOF unless this is the pyspark shell
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala | 26 |
1 files changed, 15 insertions, 11 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala index 7ca96ed57c..38b5d8e173 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala @@ -132,25 +132,29 @@ private[spark] object SparkSubmitDriverBootstrapper { val builder = new ProcessBuilder(filteredCommand) val process = builder.start() - // Redirect stdin, stdout, and stderr to/from the child JVM + // Redirect stdout and stderr from the child JVM val stdoutThread = new RedirectThread(process.getInputStream, System.out, "redirect stdout") val stderrThread = new RedirectThread(process.getErrorStream, System.err, "redirect stderr") stdoutThread.start() stderrThread.start() - // In Windows, the subprocess reads directly from our stdin, so we should avoid spawning - // a thread that contends with the subprocess in reading from System.in. - if (Utils.isWindows) { - // For the PySpark shell, the termination of this process is handled in java_gateway.py - process.waitFor() - } else { - // Terminate on broken pipe, which signals that the parent process has exited. This is - // important for the PySpark shell, where Spark submit itself is a python subprocess. + // Redirect stdin to child JVM only if we're not running Windows. This is because the + // subprocess there already reads directly from our stdin, so we should avoid spawning a + // thread that contends with the subprocess in reading from System.in. + val isWindows = Utils.isWindows + val isPySparkShell = sys.env.contains("PYSPARK_SHELL") + if (!isWindows) { val stdinThread = new RedirectThread(System.in, process.getOutputStream, "redirect stdin") stdinThread.start() - stdinThread.join() - process.destroy() + // For the PySpark shell, Spark submit itself runs as a python subprocess, and so this JVM + // should terminate on broken pipe, which signals that the parent process has exited. In + // Windows, the termination logic for the PySpark shell is handled in java_gateway.py + if (isPySparkShell) { + stdinThread.join() + process.destroy() + } } + process.waitFor() } } |