diff options
-rw-r--r-- | core/src/main/scala/spark/api/python/PythonRDD.scala | 11 |
1 files changed, 4 insertions, 7 deletions
diff --git a/core/src/main/scala/spark/api/python/PythonRDD.scala b/core/src/main/scala/spark/api/python/PythonRDD.scala index 6b9ef62529..23e3149248 100644 --- a/core/src/main/scala/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/spark/api/python/PythonRDD.scala @@ -104,19 +104,17 @@ private[spark] class PythonRDD[T: ClassManifest]( private def read(): Array[Byte] = { try { stream.readInt() match { - case length if length > 0 => { + case length if length > 0 => val obj = new Array[Byte](length) stream.readFully(obj) obj - } - case -2 => { + case -2 => // Signals that an exception has been thrown in python val exLength = stream.readInt() val obj = new Array[Byte](exLength) stream.readFully(obj) throw new PythonException(new String(obj)) - } - case -1 => { + case -1 => // We've finished the data section of the output, but we can still read some // accumulator updates; let's do that, breaking when we get EOFException while (true) { @@ -124,9 +122,8 @@ private[spark] class PythonRDD[T: ClassManifest]( val update = new Array[Byte](len2) stream.readFully(update) accumulator += Collections.singletonList(update) + new Array[Byte](0) } - new Array[Byte](0) - } } } catch { case eof: EOFException => { |