aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala32
1 files changed, 21 insertions, 11 deletions
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index f9d86fed34..8a843fbb0e 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -24,6 +24,7 @@ import java.util.{List => JList, ArrayList => JArrayList, Map => JMap, Collectio
import scala.collection.JavaConversions._
import scala.reflect.ClassTag
+import scala.util.Try
import net.razorvine.pickle.{Pickler, Unpickler}
@@ -89,16 +90,22 @@ private[spark] class PythonRDD[T: ClassTag](
dataOut.flush()
worker.shutdownOutput()
} catch {
+
case e: java.io.FileNotFoundException =>
readerException = e
- // Kill the Python worker process:
- worker.shutdownOutput()
+ Try(worker.shutdownOutput()) // kill Python worker process
+
case e: IOException =>
// This can happen for legitimate reasons if the Python code stops returning data
- // before we are done passing elements through, e.g., for take(). Just log a message
- // to say it happened.
- logInfo("stdin writer to Python finished early")
- logDebug("stdin writer to Python finished early", e)
+ // before we are done passing elements through, e.g., for take(). Just log a message to
+ // say it happened (as it could also be hiding a real IOException from a data source).
+ logInfo("stdin writer to Python finished early (may not be an error)", e)
+
+ case e: Exception =>
+ // We must avoid throwing exceptions here, because the thread uncaught exception handler
+ // will kill the whole executor (see Executor).
+ readerException = e
+ Try(worker.shutdownOutput()) // kill Python worker process
}
}
}.start()
@@ -152,7 +159,7 @@ private[spark] class PythonRDD[T: ClassTag](
val exLength = stream.readInt()
val obj = new Array[Byte](exLength)
stream.readFully(obj)
- throw new PythonException(new String(obj))
+ throw new PythonException(new String(obj), readerException)
case SpecialLengths.END_OF_DATA_SECTION =>
// We've finished the data section of the output, but we can still
// read some accumulator updates:
@@ -167,10 +174,13 @@ private[spark] class PythonRDD[T: ClassTag](
Array.empty[Byte]
}
} catch {
- case eof: EOFException => {
+ case e: Exception if readerException != null =>
+ logError("Python worker exited unexpectedly (crashed)", e)
+ logError("Python crash may have been caused by prior exception:", readerException)
+ throw readerException
+
+ case eof: EOFException =>
throw new SparkException("Python worker exited unexpectedly (crashed)", eof)
- }
- case e: Throwable => throw e
}
}
@@ -185,7 +195,7 @@ private[spark] class PythonRDD[T: ClassTag](
}
/** Thrown for exceptions in user Python code. */
-private class PythonException(msg: String) extends Exception(msg)
+private class PythonException(msg: String, cause: Exception) extends RuntimeException(msg, cause)
/**
* Form an RDD[(Array[Byte], Array[Byte])] from key-value pairs returned from Python.