diff options
author | Matei Zaharia <matei@eecs.berkeley.edu> | 2013-01-31 21:58:26 -0800 |
---|---|---|
committer | Matei Zaharia <matei@eecs.berkeley.edu> | 2013-01-31 21:58:26 -0800 |
commit | 7e2e046e37df6f71969a85f9995cb830be492050 (patch) | |
tree | ff9fcba8cd041cb0ec95b97a73d97fd6ee113490 /core/src | |
parent | 95e14fbc38450a6d0b527d79e46ff08d69810cf3 (diff) | |
parent | 39ab83e9577a5449fb0d6ef944dffc0d7cd00b4a (diff) | |
download | spark-7e2e046e37df6f71969a85f9995cb830be492050.tar.gz spark-7e2e046e37df6f71969a85f9995cb830be492050.tar.bz2 spark-7e2e046e37df6f71969a85f9995cb830be492050.zip |
Merge pull request #434 from pwendell/python-exceptions
SPARK-673: Capture and re-throw Python exceptions
Diffstat (limited to 'core/src')
-rw-r--r-- | core/src/main/scala/spark/api/python/PythonRDD.scala | 39 |
1 files changed, 24 insertions, 15 deletions
diff --git a/core/src/main/scala/spark/api/python/PythonRDD.scala b/core/src/main/scala/spark/api/python/PythonRDD.scala index f43a152ca7..39758e94f4 100644 --- a/core/src/main/scala/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/spark/api/python/PythonRDD.scala @@ -103,21 +103,27 @@ private[spark] class PythonRDD[T: ClassManifest]( private def read(): Array[Byte] = { try { - val length = stream.readInt() - if (length != -1) { - val obj = new Array[Byte](length) - stream.readFully(obj) - obj - } else { - // We've finished the data section of the output, but we can still read some - // accumulator updates; let's do that, breaking when we get EOFException - while (true) { - val len2 = stream.readInt() - val update = new Array[Byte](len2) - stream.readFully(update) - accumulator += Collections.singletonList(update) - } - new Array[Byte](0) + stream.readInt() match { + case length if length > 0 => + val obj = new Array[Byte](length) + stream.readFully(obj) + obj + case -2 => + // Signals that an exception has been thrown in python + val exLength = stream.readInt() + val obj = new Array[Byte](exLength) + stream.readFully(obj) + throw new PythonException(new String(obj)) + case -1 => + // We've finished the data section of the output, but we can still read some + // accumulator updates; let's do that, breaking when we get EOFException + while (true) { + val len2 = stream.readInt() + val update = new Array[Byte](len2) + stream.readFully(update) + accumulator += Collections.singletonList(update) + } + new Array[Byte](0) } } catch { case eof: EOFException => { @@ -140,6 +146,9 @@ private[spark] class PythonRDD[T: ClassManifest]( val asJavaRDD : JavaRDD[Array[Byte]] = JavaRDD.fromRDD(this) } +/** Thrown for exceptions in user Python code. */ +private class PythonException(msg: String) extends Exception(msg) + /** * Form an RDD[(Array[Byte], Array[Byte])] from key-value pairs returned from Python. * This is used by PySpark's shuffle operations. |