aboutsummaryrefslogtreecommitdiff
path: root/core/src
diff options
context:
space:
mode:
authorhaitao.yao <yao.erix@gmail.com>2013-02-01 14:06:45 +0800
committerhaitao.yao <yao.erix@gmail.com>2013-02-01 14:06:45 +0800
commitb57570fd12aa22baf55196e8fd7382d638fb1a37 (patch)
tree9b8d675a1cfd2aa254e06aad6ccdf6130706e0e8 /core/src
parent3190483b98a62b646311dff199417823efacbb47 (diff)
parent7e2e046e37df6f71969a85f9995cb830be492050 (diff)
downloadspark-b57570fd12aa22baf55196e8fd7382d638fb1a37.tar.gz
spark-b57570fd12aa22baf55196e8fd7382d638fb1a37.tar.bz2
spark-b57570fd12aa22baf55196e8fd7382d638fb1a37.zip
Merge branch 'mesos'
Diffstat (limited to 'core/src')
-rw-r--r--core/src/main/scala/spark/api/python/PythonRDD.scala39
1 files changed, 24 insertions, 15 deletions
diff --git a/core/src/main/scala/spark/api/python/PythonRDD.scala b/core/src/main/scala/spark/api/python/PythonRDD.scala
index f43a152ca7..39758e94f4 100644
--- a/core/src/main/scala/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/spark/api/python/PythonRDD.scala
@@ -103,21 +103,27 @@ private[spark] class PythonRDD[T: ClassManifest](
private def read(): Array[Byte] = {
try {
- val length = stream.readInt()
- if (length != -1) {
- val obj = new Array[Byte](length)
- stream.readFully(obj)
- obj
- } else {
- // We've finished the data section of the output, but we can still read some
- // accumulator updates; let's do that, breaking when we get EOFException
- while (true) {
- val len2 = stream.readInt()
- val update = new Array[Byte](len2)
- stream.readFully(update)
- accumulator += Collections.singletonList(update)
- }
- new Array[Byte](0)
+ stream.readInt() match {
+ case length if length > 0 =>
+ val obj = new Array[Byte](length)
+ stream.readFully(obj)
+ obj
+ case -2 =>
+ // Signals that an exception has been thrown in python
+ val exLength = stream.readInt()
+ val obj = new Array[Byte](exLength)
+ stream.readFully(obj)
+ throw new PythonException(new String(obj))
+ case -1 =>
+ // We've finished the data section of the output, but we can still read some
+ // accumulator updates; let's do that, breaking when we get EOFException
+ while (true) {
+ val len2 = stream.readInt()
+ val update = new Array[Byte](len2)
+ stream.readFully(update)
+ accumulator += Collections.singletonList(update)
+ }
+ new Array[Byte](0)
}
} catch {
case eof: EOFException => {
@@ -140,6 +146,9 @@ private[spark] class PythonRDD[T: ClassManifest](
val asJavaRDD : JavaRDD[Array[Byte]] = JavaRDD.fromRDD(this)
}
+/** Thrown for exceptions in user Python code. */
+private class PythonException(msg: String) extends Exception(msg)
+
/**
* Form an RDD[(Array[Byte], Array[Byte])] from key-value pairs returned from Python.
* This is used by PySpark's shuffle operations.