diff options
author | haitao.yao <yao.erix@gmail.com> | 2013-02-01 14:06:45 +0800 |
---|---|---|
committer | haitao.yao <yao.erix@gmail.com> | 2013-02-01 14:06:45 +0800 |
commit | b57570fd12aa22baf55196e8fd7382d638fb1a37 (patch) | |
tree | 9b8d675a1cfd2aa254e06aad6ccdf6130706e0e8 /core | |
parent | 3190483b98a62b646311dff199417823efacbb47 (diff) | |
parent | 7e2e046e37df6f71969a85f9995cb830be492050 (diff) | |
download | spark-b57570fd12aa22baf55196e8fd7382d638fb1a37.tar.gz spark-b57570fd12aa22baf55196e8fd7382d638fb1a37.tar.bz2 spark-b57570fd12aa22baf55196e8fd7382d638fb1a37.zip |
Merge branch 'mesos'
Diffstat (limited to 'core')
-rw-r--r-- | core/pom.xml | 11 | ||||
-rw-r--r-- | core/src/main/scala/spark/api/python/PythonRDD.scala | 39 |
2 files changed, 24 insertions, 26 deletions
diff --git a/core/pom.xml b/core/pom.xml index 862d3ec37a..873e8a1d0f 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -163,11 +163,6 @@ <profiles> <profile> <id>hadoop1</id> - <activation> - <property> - <name>!hadoopVersion</name> - </property> - </activation> <dependencies> <dependency> <groupId>org.apache.hadoop</groupId> @@ -220,12 +215,6 @@ </profile> <profile> <id>hadoop2</id> - <activation> - <property> - <name>hadoopVersion</name> - <value>2</value> - </property> - </activation> <dependencies> <dependency> <groupId>org.apache.hadoop</groupId> diff --git a/core/src/main/scala/spark/api/python/PythonRDD.scala b/core/src/main/scala/spark/api/python/PythonRDD.scala index f43a152ca7..39758e94f4 100644 --- a/core/src/main/scala/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/spark/api/python/PythonRDD.scala @@ -103,21 +103,27 @@ private[spark] class PythonRDD[T: ClassManifest]( private def read(): Array[Byte] = { try { - val length = stream.readInt() - if (length != -1) { - val obj = new Array[Byte](length) - stream.readFully(obj) - obj - } else { - // We've finished the data section of the output, but we can still read some - // accumulator updates; let's do that, breaking when we get EOFException - while (true) { - val len2 = stream.readInt() - val update = new Array[Byte](len2) - stream.readFully(update) - accumulator += Collections.singletonList(update) - } - new Array[Byte](0) + stream.readInt() match { + case length if length > 0 => + val obj = new Array[Byte](length) + stream.readFully(obj) + obj + case -2 => + // Signals that an exception has been thrown in python + val exLength = stream.readInt() + val obj = new Array[Byte](exLength) + stream.readFully(obj) + throw new PythonException(new String(obj)) + case -1 => + // We've finished the data section of the output, but we can still read some + // accumulator updates; let's do that, breaking when we get EOFException + while (true) { + val len2 = stream.readInt() + val update = new Array[Byte](len2) + stream.readFully(update) + accumulator += Collections.singletonList(update) + } + new Array[Byte](0) } } catch { case eof: EOFException => { @@ -140,6 +146,9 @@ private[spark] class PythonRDD[T: ClassManifest]( val asJavaRDD : JavaRDD[Array[Byte]] = JavaRDD.fromRDD(this) } +/** Thrown for exceptions in user Python code. */ +private class PythonException(msg: String) extends Exception(msg) + /** * Form an RDD[(Array[Byte], Array[Byte])] from key-value pairs returned from Python. * This is used by PySpark's shuffle operations. |