diff options
author | Marcelo Vanzin <vanzin@cloudera.com> | 2014-12-23 16:02:59 -0800 |
---|---|---|
committer | Josh Rosen <joshrosen@databricks.com> | 2014-12-23 16:07:59 -0800 |
commit | 7e2deb71c4239564631b19c748e95c3d1aa1c77d (patch) | |
tree | 7a9508c8a566f0cc9cc6acc83c6f652aa8df1259 | |
parent | 3f5f4cc4e7b3bc458e0579d247a0652dca365853 (diff) | |
download | spark-7e2deb71c4239564631b19c748e95c3d1aa1c77d.tar.gz spark-7e2deb71c4239564631b19c748e95c3d1aa1c77d.tar.bz2 spark-7e2deb71c4239564631b19c748e95c3d1aa1c77d.zip |
[SPARK-4606] Send EOF to child JVM when there's no more data to read.
Author: Marcelo Vanzin <vanzin@cloudera.com>
Closes #3460 from vanzin/SPARK-4606 and squashes the following commits:
031207d [Marcelo Vanzin] [SPARK-4606] Send EOF to child JVM when there's no more data to read.
-rw-r--r-- | core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala | 3 | ||||
-rw-r--r-- | core/src/main/scala/org/apache/spark/util/Utils.scala | 24 |
2 files changed, 19 insertions, 8 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala index d2687faad6..2eab998184 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala @@ -151,7 +151,8 @@ private[spark] object SparkSubmitDriverBootstrapper { val isWindows = Utils.isWindows val isSubprocess = sys.env.contains("IS_SUBPROCESS") if (!isWindows) { - val stdinThread = new RedirectThread(System.in, process.getOutputStream, "redirect stdin") + val stdinThread = new RedirectThread(System.in, process.getOutputStream, "redirect stdin", + propagateEof = true) stdinThread.start() // Spark submit (JVM) may run as a subprocess, and so this JVM should terminate on // broken pipe, signaling that the parent process has exited. This is the case if the diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 8c00f2c36a..0d771baaa6 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -1847,19 +1847,29 @@ private[spark] object Utils extends Logging { /** * A utility class to redirect the child process's stdout or stderr. */ -private[spark] class RedirectThread(in: InputStream, out: OutputStream, name: String) +private[spark] class RedirectThread( + in: InputStream, + out: OutputStream, + name: String, + propagateEof: Boolean = false) extends Thread(name) { setDaemon(true) override def run() { scala.util.control.Exception.ignoring(classOf[IOException]) { // FIXME: We copy the stream on the level of bytes to avoid encoding problems. - val buf = new Array[Byte](1024) - var len = in.read(buf) - while (len != -1) { - out.write(buf, 0, len) - out.flush() - len = in.read(buf) + try { + val buf = new Array[Byte](1024) + var len = in.read(buf) + while (len != -1) { + out.write(buf, 0, len) + out.flush() + len = in.read(buf) + } + } finally { + if (propagateEof) { + out.close() + } } } } |