aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorReynold Xin <rxin@apache.org>2014-01-25 22:41:30 -0800
committerReynold Xin <rxin@apache.org>2014-01-25 22:41:30 -0800
commitc40619d4873f36ffb96a2e6292b32d5b64eab153 (patch)
tree08cf105683c13bac4a4c5a09290ffe026880ac21 /core
parentc66a2ef1c2dc9c218069b3ce8c39a49e5b92fc16 (diff)
parentf83068497ba42c5ea5c636efebca81f684e96177 (diff)
downloadspark-c40619d4873f36ffb96a2e6292b32d5b64eab153.tar.gz
spark-c40619d4873f36ffb96a2e6292b32d5b64eab153.tar.bz2
spark-c40619d4873f36ffb96a2e6292b32d5b64eab153.zip
Merge pull request #504 from JoshRosen/SPARK-1025
Fix PySpark hang when input files are deleted (SPARK-1025) This pull request addresses [SPARK-1025](https://spark-project.atlassian.net/browse/SPARK-1025), an issue where PySpark could hang if its input files were deleted.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala9
1 files changed, 9 insertions, 0 deletions
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index 57bde8d85f..70516bde8b 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -52,6 +52,8 @@ private[spark] class PythonRDD[T: ClassTag](
val env = SparkEnv.get
val worker = env.createPythonWorker(pythonExec, envVars.toMap)
+ @volatile var readerException: Exception = null
+
// Start a thread to feed the process input from our parent's iterator
new Thread("stdin writer for " + pythonExec) {
override def run() {
@@ -82,6 +84,10 @@ private[spark] class PythonRDD[T: ClassTag](
dataOut.flush()
worker.shutdownOutput()
} catch {
+ case e: java.io.FileNotFoundException =>
+ readerException = e
+ // Kill the Python worker process:
+ worker.shutdownOutput()
case e: IOException =>
// This can happen for legitimate reasons if the Python code stops returning data before we are done
// passing elements through, e.g., for take(). Just log a message to say it happened.
@@ -106,6 +112,9 @@ private[spark] class PythonRDD[T: ClassTag](
}
private def read(): Array[Byte] = {
+ if (readerException != null) {
+ throw readerException
+ }
try {
stream.readInt() match {
case length if length > 0 =>