aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/spark/api/python/PythonRDD.scala11
1 files changed, 4 insertions, 7 deletions
diff --git a/core/src/main/scala/spark/api/python/PythonRDD.scala b/core/src/main/scala/spark/api/python/PythonRDD.scala
index 6b9ef62529..23e3149248 100644
--- a/core/src/main/scala/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/spark/api/python/PythonRDD.scala
@@ -104,19 +104,17 @@ private[spark] class PythonRDD[T: ClassManifest](
private def read(): Array[Byte] = {
try {
stream.readInt() match {
- case length if length > 0 => {
+ case length if length > 0 =>
val obj = new Array[Byte](length)
stream.readFully(obj)
obj
- }
- case -2 => {
+ case -2 =>
// Signals that an exception has been thrown in python
val exLength = stream.readInt()
val obj = new Array[Byte](exLength)
stream.readFully(obj)
throw new PythonException(new String(obj))
- }
- case -1 => {
+ case -1 =>
// We've finished the data section of the output, but we can still read some
// accumulator updates; let's do that, breaking when we get EOFException
while (true) {
@@ -124,9 +122,8 @@ private[spark] class PythonRDD[T: ClassManifest](
val update = new Array[Byte](len2)
stream.readFully(update)
accumulator += Collections.singletonList(update)
+ new Array[Byte](0)
}
- new Array[Byte](0)
- }
}
} catch {
case eof: EOFException => {