diff options
author | Reynold Xin <rxin@apache.org> | 2014-01-25 22:41:30 -0800 |
---|---|---|
committer | Reynold Xin <rxin@apache.org> | 2014-01-25 22:41:30 -0800 |
commit | c40619d4873f36ffb96a2e6292b32d5b64eab153 (patch) | |
tree | 08cf105683c13bac4a4c5a09290ffe026880ac21 /core | |
parent | c66a2ef1c2dc9c218069b3ce8c39a49e5b92fc16 (diff) | |
parent | f83068497ba42c5ea5c636efebca81f684e96177 (diff) | |
download | spark-c40619d4873f36ffb96a2e6292b32d5b64eab153.tar.gz spark-c40619d4873f36ffb96a2e6292b32d5b64eab153.tar.bz2 spark-c40619d4873f36ffb96a2e6292b32d5b64eab153.zip |
Merge pull request #504 from JoshRosen/SPARK-1025
Fix PySpark hang when input files are deleted (SPARK-1025)
This pull request addresses [SPARK-1025](https://spark-project.atlassian.net/browse/SPARK-1025), an issue where PySpark could hang if its input files were deleted.
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala | 9 |
1 files changed, 9 insertions, 0 deletions
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index 57bde8d85f..70516bde8b 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -52,6 +52,8 @@ private[spark] class PythonRDD[T: ClassTag]( val env = SparkEnv.get val worker = env.createPythonWorker(pythonExec, envVars.toMap) + @volatile var readerException: Exception = null + // Start a thread to feed the process input from our parent's iterator new Thread("stdin writer for " + pythonExec) { override def run() { @@ -82,6 +84,10 @@ private[spark] class PythonRDD[T: ClassTag]( dataOut.flush() worker.shutdownOutput() } catch { + case e: java.io.FileNotFoundException => + readerException = e + // Kill the Python worker process: + worker.shutdownOutput() case e: IOException => // This can happen for legitimate reasons if the Python code stops returning data before we are done // passing elements through, e.g., for take(). Just log a message to say it happened. @@ -106,6 +112,9 @@ private[spark] class PythonRDD[T: ClassTag]( } private def read(): Array[Byte] = { + if (readerException != null) { + throw readerException + } try { stream.readInt() match { case length if length > 0 => |