diff options
Diffstat (limited to 'core/src/main/scala/spark/serializer/Serializer.scala')
-rw-r--r-- | core/src/main/scala/spark/serializer/Serializer.scala | 32 |
1 files changed, 5 insertions, 27 deletions
diff --git a/core/src/main/scala/spark/serializer/Serializer.scala b/core/src/main/scala/spark/serializer/Serializer.scala index 50b086125a..aca86ab6f0 100644 --- a/core/src/main/scala/spark/serializer/Serializer.scala +++ b/core/src/main/scala/spark/serializer/Serializer.scala @@ -72,40 +72,18 @@ trait DeserializationStream { * Read the elements of this stream through an iterator. This can only be called once, as * reading each element will consume data from the input source. */ - def asIterator: Iterator[Any] = new Iterator[Any] { - var gotNext = false - var finished = false - var nextValue: Any = null - - private def getNext() { + def asIterator: Iterator[Any] = new spark.util.NextIterator[Any] { + override protected def getNext() = { try { - nextValue = readObject[Any]() + readObject[Any]() } catch { case eof: EOFException => finished = true } - gotNext = true } - override def hasNext: Boolean = { - if (!gotNext) { - getNext() - } - if (finished) { - close() - } - !finished - } - - override def next(): Any = { - if (!gotNext) { - getNext() - } - if (finished) { - throw new NoSuchElementException("End of stream") - } - gotNext = false - nextValue + override protected def close() { + DeserializationStream.this.close() } } } |