aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndrew Or <andrewor14@gmail.com>2014-08-27 23:03:46 -0700
committerPatrick Wendell <pwendell@gmail.com>2014-08-27 23:03:46 -0700
commitdafe343499bbc688e266106e4bb897f9e619834e (patch)
tree346f636c4305ea503e5214d1abf7436a1fe271fa
parentf38fab97c7970168f1bd81d4dc202e36322c95e3 (diff)
downloadspark-dafe343499bbc688e266106e4bb897f9e619834e.tar.gz
spark-dafe343499bbc688e266106e4bb897f9e619834e.tar.bz2
spark-dafe343499bbc688e266106e4bb897f9e619834e.zip
[HOTFIX] Wait for EOF only for the PySpark shell
In `SparkSubmitDriverBootstrapper`, we wait for the parent process to send us an `EOF` before finishing the application. This is applicable for the PySpark shell because we terminate the application the same way. However if we run a python application, for instance, the JVM actually never exits unless it receives a manual EOF from the user. This is causing a few tests to timeout. We only need to do this for the PySpark shell because Spark submit runs as a python subprocess only in this case. Thus, the normal Spark shell doesn't need to go through this case even though it is also a REPL. Thanks davies for reporting this. Author: Andrew Or <andrewor14@gmail.com> Closes #2170 from andrewor14/bootstrap-hotfix and squashes the following commits: 42963f5 [Andrew Or] Do not wait for EOF unless this is the pyspark shell
-rwxr-xr-xbin/pyspark2
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala26
2 files changed, 17 insertions, 11 deletions
diff --git a/bin/pyspark b/bin/pyspark
index 59cfdfa7c5..f553b314c5 100755
--- a/bin/pyspark
+++ b/bin/pyspark
@@ -102,6 +102,8 @@ if [[ "$1" =~ \.py$ ]]; then
gatherSparkSubmitOpts "$@"
exec $FWDIR/bin/spark-submit "${SUBMISSION_OPTS[@]}" $primary "${APPLICATION_OPTS[@]}"
else
+ # PySpark shell requires special handling downstream
+ export PYSPARK_SHELL=1
# Only use ipython if no command line arguments were provided [SPARK-1134]
if [[ "$IPYTHON" = "1" ]]; then
exec ${PYSPARK_PYTHON:-ipython} $IPYTHON_OPTS
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala
index 7ca96ed57c..38b5d8e173 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala
@@ -132,25 +132,29 @@ private[spark] object SparkSubmitDriverBootstrapper {
val builder = new ProcessBuilder(filteredCommand)
val process = builder.start()
- // Redirect stdin, stdout, and stderr to/from the child JVM
+ // Redirect stdout and stderr from the child JVM
val stdoutThread = new RedirectThread(process.getInputStream, System.out, "redirect stdout")
val stderrThread = new RedirectThread(process.getErrorStream, System.err, "redirect stderr")
stdoutThread.start()
stderrThread.start()
- // In Windows, the subprocess reads directly from our stdin, so we should avoid spawning
- // a thread that contends with the subprocess in reading from System.in.
- if (Utils.isWindows) {
- // For the PySpark shell, the termination of this process is handled in java_gateway.py
- process.waitFor()
- } else {
- // Terminate on broken pipe, which signals that the parent process has exited. This is
- // important for the PySpark shell, where Spark submit itself is a python subprocess.
+ // Redirect stdin to child JVM only if we're not running Windows. This is because the
+ // subprocess there already reads directly from our stdin, so we should avoid spawning a
+ // thread that contends with the subprocess in reading from System.in.
+ val isWindows = Utils.isWindows
+ val isPySparkShell = sys.env.contains("PYSPARK_SHELL")
+ if (!isWindows) {
val stdinThread = new RedirectThread(System.in, process.getOutputStream, "redirect stdin")
stdinThread.start()
- stdinThread.join()
- process.destroy()
+ // For the PySpark shell, Spark submit itself runs as a python subprocess, and so this JVM
+ // should terminate on broken pipe, which signals that the parent process has exited. In
+ // Windows, the termination logic for the PySpark shell is handled in java_gateway.py
+ if (isPySparkShell) {
+ stdinThread.join()
+ process.destroy()
+ }
}
+ process.waitFor()
}
}