aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorMarcelo Vanzin <vanzin@cloudera.com>2014-12-23 16:02:59 -0800
committerJosh Rosen <joshrosen@databricks.com>2014-12-23 16:07:59 -0800
commit7e2deb71c4239564631b19c748e95c3d1aa1c77d (patch)
tree7a9508c8a566f0cc9cc6acc83c6f652aa8df1259 /core
parent3f5f4cc4e7b3bc458e0579d247a0652dca365853 (diff)
downloadspark-7e2deb71c4239564631b19c748e95c3d1aa1c77d.tar.gz
spark-7e2deb71c4239564631b19c748e95c3d1aa1c77d.tar.bz2
spark-7e2deb71c4239564631b19c748e95c3d1aa1c77d.zip
[SPARK-4606] Send EOF to child JVM when there's no more data to read.
Author: Marcelo Vanzin <vanzin@cloudera.com> Closes #3460 from vanzin/SPARK-4606 and squashes the following commits: 031207d [Marcelo Vanzin] [SPARK-4606] Send EOF to child JVM when there's no more data to read.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala24
2 files changed, 19 insertions, 8 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala
index d2687faad6..2eab998184 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala
@@ -151,7 +151,8 @@ private[spark] object SparkSubmitDriverBootstrapper {
val isWindows = Utils.isWindows
val isSubprocess = sys.env.contains("IS_SUBPROCESS")
if (!isWindows) {
- val stdinThread = new RedirectThread(System.in, process.getOutputStream, "redirect stdin")
+ val stdinThread = new RedirectThread(System.in, process.getOutputStream, "redirect stdin",
+ propagateEof = true)
stdinThread.start()
// Spark submit (JVM) may run as a subprocess, and so this JVM should terminate on
// broken pipe, signaling that the parent process has exited. This is the case if the
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 8c00f2c36a..0d771baaa6 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -1847,19 +1847,29 @@ private[spark] object Utils extends Logging {
/**
* A utility class to redirect the child process's stdout or stderr.
*/
-private[spark] class RedirectThread(in: InputStream, out: OutputStream, name: String)
+private[spark] class RedirectThread(
+ in: InputStream,
+ out: OutputStream,
+ name: String,
+ propagateEof: Boolean = false)
extends Thread(name) {
setDaemon(true)
override def run() {
scala.util.control.Exception.ignoring(classOf[IOException]) {
// FIXME: We copy the stream on the level of bytes to avoid encoding problems.
- val buf = new Array[Byte](1024)
- var len = in.read(buf)
- while (len != -1) {
- out.write(buf, 0, len)
- out.flush()
- len = in.read(buf)
+ try {
+ val buf = new Array[Byte](1024)
+ var len = in.read(buf)
+ while (len != -1) {
+ out.write(buf, 0, len)
+ out.flush()
+ len = in.read(buf)
+ }
+ } finally {
+ if (propagateEof) {
+ out.close()
+ }
}
}
}