From dafe343499bbc688e266106e4bb897f9e619834e Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Wed, 27 Aug 2014 23:03:46 -0700 Subject: [HOTFIX] Wait for EOF only for the PySpark shell In `SparkSubmitDriverBootstrapper`, we wait for the parent process to send us an `EOF` before finishing the application. This is applicable for the PySpark shell because we terminate the application the same way. However if we run a python application, for instance, the JVM actually never exits unless it receives a manual EOF from the user. This is causing a few tests to timeout. We only need to do this for the PySpark shell because Spark submit runs as a python subprocess only in this case. Thus, the normal Spark shell doesn't need to go through this case even though it is also a REPL. Thanks davies for reporting this. Author: Andrew Or Closes #2170 from andrewor14/bootstrap-hotfix and squashes the following commits: 42963f5 [Andrew Or] Do not wait for EOF unless this is the pyspark shell --- bin/pyspark | 2 ++ .../deploy/SparkSubmitDriverBootstrapper.scala | 26 +++++++++++++--------- 2 files changed, 17 insertions(+), 11 deletions(-) diff --git a/bin/pyspark b/bin/pyspark index 59cfdfa7c5..f553b314c5 100755 --- a/bin/pyspark +++ b/bin/pyspark @@ -102,6 +102,8 @@ if [[ "$1" =~ \.py$ ]]; then gatherSparkSubmitOpts "$@" exec $FWDIR/bin/spark-submit "${SUBMISSION_OPTS[@]}" $primary "${APPLICATION_OPTS[@]}" else + # PySpark shell requires special handling downstream + export PYSPARK_SHELL=1 # Only use ipython if no command line arguments were provided [SPARK-1134] if [[ "$IPYTHON" = "1" ]]; then exec ${PYSPARK_PYTHON:-ipython} $IPYTHON_OPTS diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala index 7ca96ed57c..38b5d8e173 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala @@ -132,25 +132,29 @@ private[spark] object SparkSubmitDriverBootstrapper { val builder = new ProcessBuilder(filteredCommand) val process = builder.start() - // Redirect stdin, stdout, and stderr to/from the child JVM + // Redirect stdout and stderr from the child JVM val stdoutThread = new RedirectThread(process.getInputStream, System.out, "redirect stdout") val stderrThread = new RedirectThread(process.getErrorStream, System.err, "redirect stderr") stdoutThread.start() stderrThread.start() - // In Windows, the subprocess reads directly from our stdin, so we should avoid spawning - // a thread that contends with the subprocess in reading from System.in. - if (Utils.isWindows) { - // For the PySpark shell, the termination of this process is handled in java_gateway.py - process.waitFor() - } else { - // Terminate on broken pipe, which signals that the parent process has exited. This is - // important for the PySpark shell, where Spark submit itself is a python subprocess. + // Redirect stdin to child JVM only if we're not running Windows. This is because the + // subprocess there already reads directly from our stdin, so we should avoid spawning a + // thread that contends with the subprocess in reading from System.in. + val isWindows = Utils.isWindows + val isPySparkShell = sys.env.contains("PYSPARK_SHELL") + if (!isWindows) { val stdinThread = new RedirectThread(System.in, process.getOutputStream, "redirect stdin") stdinThread.start() - stdinThread.join() - process.destroy() + // For the PySpark shell, Spark submit itself runs as a python subprocess, and so this JVM + // should terminate on broken pipe, which signals that the parent process has exited. In + // Windows, the termination logic for the PySpark shell is handled in java_gateway.py + if (isPySparkShell) { + stdinThread.join() + process.destroy() + } } + process.waitFor() } } -- cgit v1.2.3